Rebalancing tasks dynamically

I have mixed CPU types, I have simple compute job that submit to the cluster.
I have totally 64 CPU resources. When it starts it allocates or rather queues the job in a pool. But when a machine with certain CPU type finishes early, the job is not allocated to the idling CPUs, and it is waiting for other resources to finish. It rather does not do anything automatic, it just splits the job and posts.

async_results = [pool.apply_async(launch_compute, args=(data,)) for data in data_array]
results = [ar.get() for ar in async_results]

Is there any configuration that makes the cluster more efficient and that it dynamically finds free CPUs that already finished their job and feed the pending ones to them?

Running on a bare machine works much faster, if I distribute manually using simple socket.

whats your job and cluster configs?

5 Machines, and the job is simply a Hello World type of thing and some calculation to keep the CPUs busy. I was trying to test out how this load balanced across CPUs. Just really a getting started example. The array has about 500 values, that means it is creating 500 independent jobs. Is there any differentiating configuration for concurrency and parallel executions?

The autoscaler and scheduler will automatically orchestrate that for you; you just have to define per task/actor how many CPUs/memory you want to allocate it and do the same at the Ray Cluster level.

You can then tune accordingly - buffering and queueing is provided out of box.

Well, I did change and create multiple tasks, and that seems to be taken care. But I am running into another problem. The tasks are distributed to about 100+ CPUs, but it is running into a problem of NOT releasing file handles. Even though I explicitly call file cleanups in python. But when I complete the task and shutdown ray cluster and repeat the whole batch of Tasks again, it works ok. But If I create let us say 120 tasks and keep them alive and feed data to these 120 tasks continuously, I noticed that file handles are being piled up. But If I start 120 tasks and send a batch and shut it down and initiate another batch of Tasks, then it works fine. Is there something I am missing here in configuration?
Initiating Tasks and scaling for each batch is a performance bottleneck for me. I want to initiate and use that to keep processing data instead of batches of data once at a time. There is the overhead of auto scale or rather time to initiate and scale.

[psutil.Process(pid=34240, name=‘ray::IDLE’, status=‘disk-sleep’, started=‘15:12:06’), psutil.Process(pid=34024, name=‘ray::processor’, status=‘disk-sleep’, started=‘15:12:06’)] will be forcefully terminated.

But it never gets terminated

Like you basically want a long running Ray Cluster and to regularly submit batches of Jobs to it in waves correct?

Yes. I want to start the processes and keep them assigned. And I want to send the batches of data to be processed. If I do that on individual machines in a plain python and send the data or have a config file for these batch processes to pick up, they work fine. I changed the same code so it can work on Ray Task and using Actor class to share object. but it has big problem with files locking. Each of those 120 processes will access files open/read/write. Not sure what is going on there in Ray code.

[psutil.Process(pid=34240, name=‘ray::IDLE’, status=‘disk-sleep’, started=‘15:12:06’), psutil.Process(pid=34024, name=‘ray::processor’, status=‘disk-sleep’, started=‘15:12:06’)] will be forcefully terminated.

This shows up if i try to terminate (ray stop) the hung ray tasks. But it never gets terminated. I had to restart the shared drive machine where these files reside to kill these hung Tasks on the Ray node.

Not sure if Dispy is worth to spend some time. I just want to use a framework than a custom code. Oh well…