Running methods with actors is slower than running normal methods

Hey, I have been trying to use Ray for some things in my project. I’m trying to process data from three sources at the same time. I made it to the point of running methods that receive objects but sadly my object uses s3 functionalities to download files and TypeError: can't pickle SSLContext objects error appears.

This is the code I’m using for that (this will raise the error when I use an s3 instance)
ray.init()
print(ray.available_resources())

@ray.remote
def f1(object_, x):
    time.sleep(5)
    return [i for i in range(100*x)]

@ray.remote
def f2(object_, x):
    time.sleep(5)
    return [i for i in range(100*x)]

@ray.remote
def f3(object_, x):
    time.sleep(5)
    return [i for i in range(100*x)]

class test:
    name = "hey"
    apellido = "bye"
    
    @ray.remote
    def f(self, x):
        return x * x
 
    def run_multiprocess(self):
        futures = [f1.remote(self, 20), f2.remote(self, 10), f3.remote(self, 5)]
        a, b, c = ray.get(futures) # [0, 1, 4, 9]

t = test()
t.run_multiprocess()

This code executes in 9 seconds (kind of expected). I don’t know why this took 4 seconds to set up everything.

After this I tried another approach, using actor methods

ray.init()
@ray.remote
class TrainInputs (object):

    @ray.method(num_returns=1)
    def f1(self, x):
        time.sleep(5)
        return [i for i in range(100*x)]

    @ray.method(num_returns=1)
    def f2(self, x):
        time.sleep(5)
        return [i for i in range(100*x)]

    @ray.method(num_returns=1)
    def f3(self, x):
        time.sleep(5)
        return [i for i in range(100*x)]

class Train():
    t = TrainInputs.remote()
    def get_inputs_train(self):
        
        print(ray.available_resources())
        futures = [self.t.f1.remote(20), self.t.f2.remote(10), self.t.f3.remote(5)]
        a, b, c = ray.get(futures)

        
train = Train()
train.get_inputs_train()

ray.shutdown()

But the execution lasts 19.7 seconds which means we don’t see any improvements.

I also tested async approach but since we are not working with any additional library (all this is manual) the gil lock will not be bypassed.

Any ideas? am I doing anything wrong?.

The reason why actor doesn’t show you the improvement is you are using 1 actor. Actor can process 1 message at a time (unless you use an async actor). So, your futures are not executed concurrently.

I don’t know why this took 4 seconds to set up everything.

Can you elaborate a bit more about this? What do you mean by setup here?

I made it to the point of running methods that receive objects but sadly my object uses s3 functionalities to download files and TypeError: can't pickle SSLContext objects error appears.

This is because some objects are implicitly captured, and a part of it is not serializeable. It can happen for things like locks. Obviously the SSLContext is not serializeable (and it makes sense because it doesn’t make sense to port SSL context to other machines or processes).

To dig into the issue, you can take a look at using this tool; Serialization — Ray v2.0.0.dev0

Hey,

The reason why actor doesn’t show you the improvement is you are using 1 actor. Actor can process 1 message at a time (unless you use an async actor). So, your futures are not executed concurrently.

As I understand using async will not skip the GIL and the code will be running as normal, without any speed improvement, am I right?

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/

This is what I see in your link, so the idea should be to send the download functionality to another class function, not the one with the decorator, Did I undertand properly?

@sangcho,

I have a clarification question on this. For a ray actor is it one message per method or per object?

For example if I have an actor like this

class Actor:
   def method_a():...
   def method_b():...

#and a call like this:
a=[obj.method_a.remote(),obj.method_b.remote()]

Is there a chance that method_a and method_b would run concurrently or are we garunteed that one of them would run to completion before the other. I don’t care about the order they are run in in this case just whether or not they could run at the same time.

As I understand using async will not skip the GIL and the code will be running as normal, without any speed improvement, am I right?

It depends on your workload. If you have IO intensive workload, asyncio can have concurrent operations using event loop. If it is just compute intensive, as you mentioned, this won’t have performance improvement.

As I understand using async will not skip the GIL and the code will be running as normal, without any speed improvement, am I right?

Yes! Let me give you an example.

a = obj # imagine this is un-serializable object.

@ray.remote
class A:
    def __init__(self):
        self.a = a # here, a is implicitly captured. That means a should be serializable.

# To fix the issue do following instead;

@ray.remote
class A:
    def __init__(self, a):
        self.a = a

a = A.remote(a) # Now, a is passed, so it doesn't have to be implicitly captured.

I have a clarification question on this. For a ray actor is it one message per method or per object?

One ray actor can process one message at a time. Here, “message” means the actor method is invoked by other ray tasks/driver/actors through an actor handle. That says, you can think while one of actor method is invoked, others can’t be invoked. There are ways to get around this;

Concurrent call

# Start 5 threads to process messages concurrently. Note that the class should be thread-safe
# to use it.
a = Actor.options(max_concurrency=5).remote() 

Async actor
https://docs.ray.io/en/master/async_api.html#asyncio-concurrency-for-actors

Is there a chance that method_a and method_b would run concurrently or are we garunteed that one of them would run to completion before the other. I don’t care about the order they are run in in this case just whether or not they could run at the same time.

In this case, method_a is processed first, and after it is completed method_b is processed. Ordering from each caller is guaranteed. If you would like to achieve more concurrency when using actors, you can try using actor pool; ray.util.actor_pool — Ray v2.0.0.dev0

Thank you for the answer. Is this true even if the actor is being called from several different actors running in different processe like rllib rollout workers? In my case I am trying to limit concurrency. I have a variables that are modified by multiple methods. I currently am using a lock inside these methods but your comment before made me think I could remove it.

Is this true even if the actor is being called from several different

Can you tell me what “this” means here? (since I replied two questions ordering + concurrency)

I have an actor (A) that is counting rllib environment episodes and then restarting an application after n episodes. I have 50 rllib rollout workers that are each calling A periodically. The rollout workers are spread out across several nodes. So if rollout workers a and b on different nodes call A.method_a.remote() and A.method_b.remote() at the same time only one will be processed at a time?

oh yeah you are right. You don’t need to have locks unless you have max_concurrency configured there.

Hey, thanks for the info.
It’s pretty clear now!