Ray.put from Ray task - local or not

Assume my code deployed as a Ray task, contains statement ray.put('abc') . Does “abc” persisted in the Plasma at the same worker node that my task is running or ray.put('abc') directed to the cluster head node that decides which Plasma to use in the entire cluster and then most likely will be persisted on the other worker?

Plasma store is “local only”. So objects are only created locally, and they are transferred across the cluster when they are needed by others. Note there could be many copies of the same objects. That says;

ray.put → object is created in the local plasma store where the caller of ray.put exists.
task's return object → object is created in the local plasma store where the task is executed.

So if task creates some objects that stored in local Plasma, does Global Catalog Store also contains pointers to those objects? If yes, does local ray.put also update global catalog?

Yes. Global catalog is updated by raylet after the object is created.

thanks a lot for the information. Assume I have a Ray cluster with 2 workers. If i write a program and run it on my laptop

import ray

ray.init(IP of my Ray cluster)
id = ray.put(mydata)

This code is not executed on the worker, right? Does it means, in this case, ray.put() will get to HEAD node, that will choose arbitrary Plasma on one of the workers to store mydata?

Ray head node is identical to worker nodes except that it has a couple more important components to manage the cluster. So, it will be just stored in a head node plasma store.

If i want a task to write data into local plasma on the worker node where the task is executed. Is this correct code?

@ray.remote
def my_func(val)
  return ray.put(val)

ray.init()
future = my_func.remote('my-test-data")
object_id = ray.get(future)

Now object_id contains ID of the object that was created in the local plasma where my_func executed. Is this correct?

Yes, you are right! Btw, calling it object_ref is more accurate there. So your driver now will have a ref to the plasma object in a worker node, and it is only pulled to the driver node only when you call ray.get on the ref!

@ray.remote
def my_func(val)
  return ray.put(val) # Put the object on a node this task runs on.

ray.init()
future = my_func.remote('my-test-data") is 
object_ref = ray.get(future) # you only have a reference to an object in a worker node now.
result = ray.get(object_ref) # This will start pulling the object from the worker node to a driver node.

@sangcho In the context above…

@ray.remote
def my_func(val)
   return ray.put(val) # Put the object on a node this task runs on.

ray.init()
future = my_func.remote('my-test-data")

my_func.remote() submitted to the local node where this code executed. If local node get fully utilized, does auto scaller will submit my_func to different nodes?

Functions are submitted to remote nodes if the local node resources are utilized (e.g., num_cpus). But if the local node cannot create an object because the plasma store is full, it will just raise OOM. From ray 1.3 (which will be released in 2~3 weeks), it will use the object spilling by default, so objects will be spilled to disks if the local node is not available (Memory Management — Ray v1.2.0).

Also note that autoscaler doesn’t submit functions to other nodes. It is what raylet is doing, and Ray doesn’t have a centralized scheduler (it uses the decentralized protocol for scalability). You can checkout white paper for more internal details about Ray! Ray Whitepapers — Ray v1.2.0