Using a subset of available CPUs

I’m having difficulty figuring out a good way to use only a subset of the CPUs in an established cluster in ray 1.0.1. I don’t want to tear down and restart my cluster every time a parallel operation is requested by the user with fewer CPUs than the total number.

for example, an easy solution would be this if it existed:
ray.init(num_cpus=100)
results = ray.get(jobs, processes=25) # use 25 out of 100 CPUs

But since ray.get() doesn’t take a processes argument I tried this:
ray.init(num_cpus=100)
pool = ray.multiprocessing.Pool(processes=25, ray_address=‘auto’)
results = pool.starmap(f, args_list)
del pool

But the pool implementation is barely faster than single processor computing in my setup, and also gives a lot of crash messages related to and unrelated to memory shortages (ray.get() doesn’t have the performance and crashing issues for me).

I also chunked the list of jobs and called ray.get() on each chunk, but this used more CPUs than the chunk size, so it didn’t work to limit CPU utilization.

Is there a different way to do this that doesn’t require the pool interface?

Thanks
John

This workaround for CPU throttling worked. You need to chunk the argument list not the futures/jobs list.

def f(a, b):
x = 0
for i in range(100000000):
x += 1
return a + b

ray_f = ray.remote(f)

n_jobs = 100
chunk_size = 25

args_list = [(i, 2*i) for i in range(n_jobs)]
args_chunks = [args_list[i : i + chunk_size] for i in range(0, n_jobs, chunk_size)]

results = []
for args_chunk in args_chunks:
job_chunk = [ray_f.remote(*args) for args in args_chunk]
results.extend(ray.get(job_chunk))

That’s right, if you want to use 25 processes, then one way to do that is to submit 25 tasks and to batch the arguments together into 25 chunks like you’re doing.