Ray exec multiple scripts w/ tune.run() to same ray cluster

I have a need to execute 100s of different model training scripts. Training each model requires a data pull from Snowflake and a tune.run() with Ax bayesopt for 15-20 samples.

My original thought was to ray exec config.yaml train_model.py --arg1 … --arg to the same cluster. However, each time I ray exec and the cluster autoscales then any existing tune.run() starts sending trials to new GPU workers (see image).

This is causing two issues:

  1. Costly calls to DB mid-run significantly slows everything down

  2. I believe it is filling up worker memory causing our nodes to be marked dead.
    image

The code running this is proprietary and haven’t had time to create a reproducible example yet. I am wondering if anyone has similar use cases and any recommendations? Any other best practices I should be aware of?

Right now I am considering using the --start setting & ray object store to create a mini-cluster for each of the different model training scripts.

Both head & worker running;
rayproject/ray-ml:1.1.0 container on AWS Deep Learning Base AMI (Ubuntu 18.04)

Hi @nateewall, do I understand correctly that you’re submitting multiple tasks to the same cluster from different processes?

This seems to be a multi tenancy problem. Generally Ray does not support multi tenancy, yet, especially not with respect to the autoscaler. I.e. if the autoscaler adds new nodes to Ray, there is no way for the separate Tune processes to know which resources belong to which task. So they just take over on a first come first serve basis.

As I said, I don’t think we currently support multi-tenancy at all, but cc @Alex who might know more about this.

I’d suggest two ways you could try to mitigate these problems. For one, since you’re using the autoscaler anyway, you could start separate clusters for the different tuning runs. Secondly, if you would really like to explore submitting tasks to the same cluster at the same time, you can try Ray Tune’s concurrency limiter to limit the number of parallel trials for each tuning run - that way existing runs will not take over additional resources that come available after scaling up.

Thanks @kai! Based on all the docs this seemed outside the intended use case so appreciate your feedback.

I have been using the concurrency limiter w/ max_concurrent=1 and was finding when executing a new run to the same cluster the new run would often times take over an existing resource and the existing run would wait for scaling and move trials to that new resource? If that is not expected I can try and recreate with mnist example this weekend.

Right now I am thinking about launching a new cluster for each of the training processes. Do you have any insight around best practice for access training data across the different workers? Should I be using ray object stores and passing that to the run or to continue with my current method of pulling the data within the trainable class itself?

Hey, sorry I might be missing something here, but from my naive perspective, it sounds like the real goal here is to stop a single tune job from hogging all the resources on the cluster right?

@Alex I am setting max concurrent = 1 for each tune.run() so they are not hogging all the resources. Rather launching new runs causes the next trial of the existing run to move to a different worker node. This causes an expensive call to the database to get the data to that worker node.

What I was hoping was that there was a setting that would allow each tune.run() to run on the same worker node throughout the entire run to reduce calls to the database.

Based on feedback above I am now just investigating how to efficiently start and tear down a small cluster for each process separately.

Thanks for the clarification. It sounds to me like this isn’t really a multi tenancy issue, but rather that Tune sometimes migrates a run to different nodes between trials.

@kai do you know if placement groups for tune solve this issue?

I tried to tinker around with the changes merged from @kai in [tune] placement group support by krfricke · Pull Request #13370 · ray-project/ray · GitHub as so:

def placement_group_factory():
    head_bundle = {"CPU": 1, "GPU": 0, "custom": 0}
    gpu_bundle = {"CPU": 4, "GPU":1}
    return placement_group([head_bundle, gpu_bundle], strategy="STRICT_PACK")

tune_analysis = tune.run(
    SequenceTrainable,
    queue_trials=True,
    num_samples=10,
    search_alg=search_alg,
    resources_per_trial=placement_group_factory(),
    config=config,
    reuse_actors=True,
)

However, I could not get it working in the ray-ml:nightly container…

Hi @nateewall, you need to pass resources_per_trial=placement_group_factory (without the () brackets). Can you try that?

@kai,
Yes so that will get it running but if I run as set up above I get:

RuntimeError: No CUDA GPUs are available

If I run passing just the GPU bundle to the factory I get:

File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/utils.py”, line 308, in resources_from_resource_arguments
raise ValueError("The resources dictionary must not "
ValueError: The resources dictionary must not contain the key ‘CPU’ or ‘GPU’
2021-01-26 11:45:26,221 INFO trial_runner.py:953 – Blocking for next trial…

Just to be sure, this is the output of

tune_analysis = tune.run(
    SequenceTrainable,
    queue_trials=True,
    num_samples=10,
    search_alg=search_alg,
    resources_per_trial=placement_group_factory,
    config=config,
    reuse_actors=True,
)

?

I have some logging & checkpoint options set but that is the meat of it.

@nateewall sorry for following up on this late. I’d like to help you resolve your issue:

Unfortunately, the placement group suggestion will likely not solve the issue for you.

Can you tell me more about your setup – specifically:

  1. Are you using some form of cluster manager (like Kubernetes)?
  2. How much data are you reading from snowflake, and how long does each data read take?
  3. Do you have any security constraints?

@rliaw, No worries! And yes after spending a day with the placement groups it didn’t seem like I was gonna get that working.

  1. I am still developing the optimization process so have not put in a cluster manager. I am just using a simple script that will execute multiple processes that still requires babysitting to avoid my AWS account’s spot instance limits:
    ray exec --run-env docker --start -n model-id config.yaml 'python optimize.py [--args]'

  2. Snowflake data varies from about 100MB up to <5GB. So the _load_data() call in the trainable varies from ~1min up to ~15min based on what I have observed, but have not measured. For reference the _load_data() in the trainable does the following:

  • queries data from snowflake
  • does some preprocessing
  • generates pytorch dataset
  • dumps DS to a pickle on worker for subsequent trials to avoid snowflake calls.
  • creates dataloader
  1. for now lets assume we have no constraints, if you have specific questions perhaps we can chat on Slack.

Thanks for your help and recommendations you may have!

Hey @nateewall thanks a bunch for following up!

Is it possible to cache the data on the node? Do all trainables load the same data?

Regarding your initial use case:

I have a need to execute 100s of different model training scripts. Training each model requires a data pull from Snowflake and a tune.run() with Ax bayesopt for 15-20 samples.

Can you tell me more about this? What are the differences between the model training scripts? Is it possible to bake it all into 1 Tune run?

Sorry for so many questions! I’d like to get all the context I can before providing a suggestion :slight_smile:

I do pickle and cache the Pytorch datasets on the node after the initial query. So within one a scripts tune.run() all the actors are using the same data. The issues come when I am trying to scale. If I want to scale it has to query the data again and cache on the new worker node.

So the script is the mostly the same for training each model. The only difference is in the data we pull. We change this through a few args we pass in when executing the model training script that represents the distinct subset of the data we want to query. If you think there may be a way that makes sense to get this into 1 Tune run then I am open to giving it a try.

Sincerely appreciate your help so any other questions feel free to keep em coming.

OK got it. Do multiple tune.run calls ever share the same data?

My understanding of your problem is that per tune.run call (each distributed hyperparameter sweep), you’re seeing data being downloaded on each node, which is costly and time consuming.

Hmm, what if you run multiple Ray clusters but with the same underlying EFS instance across all the Ray clusters? This makes sure that the data is cached across all workers, so only 1 node will need to download at a time (and for the entirety of your training).

Alternatively, we can think about how to keep the data in memory (perhaps with a large memory instance and spilling to disk when necessary), but I’m not sure if we need that approach yet.

Yep you nailed it!

Going to mess around with this approach this week starting from EFS config example in the docs :slight_smile:

Appreciate your help and will let you know how this works out!

Awesome! Keep me updated on this.