Ray Data Performance Issues

Hi Ray team,

We are trying to

  • Load a large dataset (4 Billion plus rows)
  • Shard it across a ray cluster
  • Perform a task (like executing a query) on each shard
  • Get the results from each shard
  • Combine the results

To achieve this we have tried the following so far

  • Cluster config: 4 nodes (16 cores (CPU) , 64 GB Memory and 1TB of object store)
  • Stateful Actors for data load attached to a placement group with a STRICT_SPREAD strategy - guaranteeing one worker per shard
  • Perform execute task (Actor Method) using the same worker
  • Perform another ray task to collect the results

Questions:

  1. With the following placement_group settings
bundles =[ {"CPU": 1}, {"CPU": 1}, {"CPU": 1}, {"CPU": 1}]
strategy="STRICT_SPREAD"
num_cpus=4

Is this the optimum configuration? How do we utilize all the available cores?
Currently it is not clear whether the above config is able to achieve that.

  1. Will bumping up the CPUs like below in each bundle help?
bundles =[ {"CPU": 2}, {"CPU": 2}, {"CPU": 2}, {"CPU": 2}]
strategy="STRICT_SPREAD"
num_cpus=8
  1. We observe slowness while collecting the results.
@remote

def collect_results:
       # code to collect the results from each shard

How does ray ensure that the task utilizes all available cores on the given worker?

Please review and provide your feedback on this.

Thanks,
Preeti

@Alex , @Clark_Zinzow , @sangcho can you please provide your feedback/pointers on the performance issues mentioned above? Any guidelines on tuning the performance would be very helpful.