Hello! I am looking for a way to run a list of remote functions but with limited parallelism (N). As soon as a remote function finishes, another remote function should get scheduled until all remote functions have finished. At most N remote functions should be in flight at a time.
In the docs, I found
ray.util.multiprocessing.Pool (Distributed multiprocessing.Pool — Ray v1.9.2), however, running something like
Pool(N) will fail if the cluster does not have N CPUs. Is there a way I can instantiate a Pool with N processes, and have the cluster autoscale to that amount while also processing functions with CPUs that are available?
data = [x for x in range(1000)] # assume cluster has 20 CPUs but can scale up to 100. pool = Pool(100) # cluster begins to scale to 100 CPUs. results = pool.imap(f, data) # cluster runs `f` 1000 times, with at most 100 at the same time.
The other docs I found is to use
ray.wait (Pattern: Using ray.wait to limit the number of in-flight tasks — Ray v1.9.2), but this has a drawback of running in fixed chunks.
My current workaround is to implement
ray.wait with a queue. (I use a PriorityQueue to preserve order). Is there a better way?
@ray.remote def f(index, d): result = compute(d) return (index, result) parallelism = 100 data = [x for x in range(1000)] finished_results_q = queue.PriorityQueue() in_progress_runs = set() index = 0 while index < len(data): if len(in_progress_runs) < parallelism: in_progress_runs.add(f.remote(index, data[index])) index += 1 continue time.sleep(1) finished_result_ref, _ = ray.wait(list(in_progress_runs)) finished_results_q.put(ray.get(finished_result_ref)) in_progress_runs.remove(finished_result_ref) for result in ray.get(list(in_progress_runs)): finished_results_q.put(result) results = finished_results_q.queue