Ray.put from Ray task - local or not

Assume my code deployed as a Ray task, contains statement ray.put('abc') . Does “abc” persisted in the Plasma at the same worker node that my task is running or ray.put('abc') directed to the cluster head node that decides which Plasma to use in the entire cluster and then most likely will be persisted on the other worker?

Plasma store is “local only”. So objects are only created locally, and they are transferred across the cluster when they are needed by others. Note there could be many copies of the same objects. That says;

ray.put → object is created in the local plasma store where the caller of ray.put exists.
task's return object → object is created in the local plasma store where the task is executed.

So if task creates some objects that stored in local Plasma, does Global Catalog Store also contains pointers to those objects? If yes, does local ray.put also update global catalog?

Yes. Global catalog is updated by raylet after the object is created.

thanks a lot for the information. Assume I have a Ray cluster with 2 workers. If i write a program and run it on my laptop

import ray

ray.init(IP of my Ray cluster)
id = ray.put(mydata)

This code is not executed on the worker, right? Does it means, in this case, ray.put() will get to HEAD node, that will choose arbitrary Plasma on one of the workers to store mydata?

Ray head node is identical to worker nodes except that it has a couple more important components to manage the cluster. So, it will be just stored in a head node plasma store.

If i want a task to write data into local plasma on the worker node where the task is executed. Is this correct code?

def my_func(val)
  return ray.put(val)

future = my_func.remote('my-test-data")
object_id = ray.get(future)

Now object_id contains ID of the object that was created in the local plasma where my_func executed. Is this correct?

Yes, you are right! Btw, calling it object_ref is more accurate there. So your driver now will have a ref to the plasma object in a worker node, and it is only pulled to the driver node only when you call ray.get on the ref!

def my_func(val)
  return ray.put(val) # Put the object on a node this task runs on.

future = my_func.remote('my-test-data") is 
object_ref = ray.get(future) # you only have a reference to an object in a worker node now.
result = ray.get(object_ref) # This will start pulling the object from the worker node to a driver node.