parallel.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from tqdm import tqdm
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=0):
  4. """
  5. A parallel version of the map function with a progress bar.
  6. Args:
  7. array (array-like): An array to iterate over.
  8. function (function): A python function to apply to the elements of array
  9. n_jobs (int, default=16): The number of cores to use
  10. use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of
  11. keyword arguments to function
  12. front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job.
  13. Useful for catching bugs
  14. Returns:
  15. [function(array[0]), function(array[1]), ...]
  16. """
  17. # We run the first few iterations serially to catch bugs
  18. if front_num > 0:
  19. front = [function(**a) if use_kwargs else function(a)
  20. for a in array[:front_num]]
  21. else:
  22. front = []
  23. # If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
  24. if n_jobs == 1:
  25. return front + [function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:])]
  26. # Assemble the workers
  27. with ProcessPoolExecutor(max_workers=n_jobs) as pool:
  28. # Pass the elements of array into function
  29. if use_kwargs:
  30. futures = [pool.submit(function, **a) for a in array[front_num:]]
  31. else:
  32. futures = [pool.submit(function, a) for a in array[front_num:]]
  33. kwargs = {
  34. 'total': len(futures),
  35. 'unit': 'it',
  36. 'unit_scale': True,
  37. 'leave': True
  38. }
  39. # Print out the progress as tasks complete
  40. for f in tqdm(as_completed(futures), **kwargs):
  41. pass
  42. out = []
  43. # Get the results from the futures.
  44. for i, future in tqdm(enumerate(futures)):
  45. try:
  46. out.append(future.result())
  47. except Exception as e:
  48. out.append(e)
  49. return front + out