How to effectively use parallelization with Ray in Python?

I’m trying to learn how to use the Ray API and comparing with my code for joblib. However, I don’t know how to effectively use this (my machine has 16 CPU).

Am I doing something incorrectly? If not, why is Ray so much slower?

import ray 
from joblib import Parallel, delayed

num_cpus = 16

@ray.remote(num_cpus=num_cpus)
def square(x):
    return x * x

def square2(x):
    return x * x

Ray:

%%time
# Launch parallel square tasks.
futures = [square.remote(x=i) for i in range(1000)]

# Retrieve results.
print(len(ray.get(futures)))
# CPU times: user 310 ms, sys: 79.7 ms, total: 390 ms
# Wall time: 612 ms

Joblib:

%%time
futures = Parallel(n_jobs=num_cpus)(delayed(square2)(x=i) for i in range(1000))
print(len(futures))
# CPU times: user 92.5 ms, sys: 21.4 ms, total: 114 ms
# Wall time: 106 ms

I think there was an earlier question about this and I left an answer there, but I don’t see the thread anymore; was this you and if so, could you restore the previous thread? Here is what I remember from the last post:

Ray schedules tasks concurrently based on their CPU requirement. If you set num_cpus=16 on a 16-core machine, this tells Ray that each task needs all 16 CPUs in order to run. Thus, each task here will run sequentially. Try running this again with @ray.remote and no arguments, and the tasks should now be able to run in parallel.

You should also check out the Ray design patterns on how best to parallelize your code. Multiplying two integers is nearly instantaneous, so the relative system overhead will be high in this case. In an actual workload, ideally you should make each task longer, e.g., by multiplying multiple integers in each task.

Basically I’m doing a full rewrite of some code I wrote in grad school: soothsayer/soothsayer_clairvoyance.py at master · jolespin/soothsayer · GitHub

I was mostly trying to get familiar with the usage of ray in comparison with what I’m famiilar with (joblib).

How can specify the total number of threads Ray can use?

Ah, here is the original thread I was thinking of, on stackoverflow.

By default, it will be the number of vCPUs on your machine, but you can also pass in num_cpus to ray.init if you’re on your laptop or --num-cpus to ray start on a cluster to override it.

Thanks! I’ve sped up my code quite a bit but I get the same performance when I do n_jobs=1 and n_jobs=16

Here’s the function I’m parallelizing:

        @ray.remote
        def _fit_estimators(X_A, X_B, y_A, y_B, estimator):
            """
            """
            with warnings.catch_warnings():
                # warnings.filterwarnings("ignore", category=ConvergenceWarning)
                warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

                # Subset A
                estimator.fit(X_A, y_A)
                weight_A = getattr(estimator, self.feature_weight_attribute_)
                score_A_B = self.scorer(estimator=estimator, X=X_B, y_true=y_B)

                # Subset B
                estimator.fit(X_B, y_B)
                weight_B = getattr(estimator, self.feature_weight_attribute_)
                score_B_A = self.scorer(estimator=estimator, X=X_A, y_true=y_A)

            return [weight_A, weight_B], [score_A_B, score_B_A]

I’m calling it like this:

                futures = [_fit_estimators.remote(estimator=estimator, **training_data) for estimator in self.estimators_.values()]
                parallel_results = ray.get(futures)

It’s a bit hard to say without seeing the rest of your code, but generally if you are not seeing a speedup on multicore, it could be for one of these reasons:

  1. The time per task is very short. The longer the task the better, but generally for tasks over 10ms you should see at least some speedup.
  2. The tasks themselves are multi-threaded already, so there is no benefit in parallelizing with Ray. In fact, you may even see the code slow down if using Ray because of contention. In this case, you can use Ray to scale up to multiple machines, but it’s likely that multicore single-node performance is already maximized.
  3. Too much overhead from data copying. Ideally, you should try to use shared memory for passing objects between tasks. One thing to check in the code you provided is the training_data. Ray makes it efficient to pass large objects like numpy arrays between tasks, but it doesn’t deduplicate values automatically for you. So if you need to pass the same training_data to multiple tasks from the driver, you should first call ray.put on each object, then pass the ObjectRef instead of the raw Python values. More info in this design pattern doc.

Other docs to check out are the design patterns guides and troubleshooting guide.