CPU cores, CPU threads, and scaling of Ray tasks

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

Hi,

I am using Ray to implement a project on a cluster. While trying to determine the best way to use Ray’s capabilities for my specific case, I decided to examine how Ray’s task execution time scales with the resources available on my local system, and the result I obtained left me puzzled.

I put below my working environment specs, the code I ran, and the output I obtained.

System specifications

Intel® Xeon® Processor E5-2630 v4, 64 GB RAM, Ubuntu 22.04.4 LTS, Python 3.11.9, Ray 2.24.0

Python script
import time

import numpy as np

PHYSICAL_CORES_COUNT = 10


def f():
    a = np.arange(10000).reshape(100, 100)
    for _ in range(5000):
        a @ a


def ray_tasks():
    import ray

    ray.init()

    for i in range(7):
        counter = time.perf_counter()

        ray.get([ray.remote(f).remote() for _ in range(i + PHYSICAL_CORES_COUNT - 3)])

        print("Ray tasks", i + PHYSICAL_CORES_COUNT - 3, time.perf_counter() - counter)

    ray.shutdown()


def ray_pool():
    import ray
    from ray.util.multiprocessing import Pool

    for i in range(7):
        ray.init()

        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=i + PHYSICAL_CORES_COUNT - 3) as pool:
                pool.apply(f)

        print("Ray pool", i + PHYSICAL_CORES_COUNT - 3, time.perf_counter() - counter)

        ray.shutdown()


def multiprocessing_pool():
    from multiprocessing import Pool

    for i in range(7):
        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=i + PHYSICAL_CORES_COUNT - 3) as pool:
                pool.apply(f)

        print(
            "Multiprocessing pool",
            i + PHYSICAL_CORES_COUNT - 3,
            time.perf_counter() - counter,
        )


def joblib():
    from joblib import Parallel, delayed

    for i in range(7):
        counter = time.perf_counter()

        Parallel(n_jobs=i + PHYSICAL_CORES_COUNT - 3 )(delayed(f)() for _ in range(1))

        print("Joblib", i + PHYSICAL_CORES_COUNT - 3, time.perf_counter() - counter)


if __name__ == "__main__":
    ray_tasks()
    ray_pool()
    multiprocessing_pool()
    joblib()

Shell output
2024-06-12 16:29:12,041	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray tasks 7 5.529587419005111
Ray tasks 8 5.4087012010859326
Ray tasks 9 5.295977437053807
Ray tasks 10 5.4375265730777755
Ray tasks 11 7.53721311094705
Ray tasks 12 7.639187020016834
Ray tasks 13 7.737383277970366
2024-06-12 16:30:02,052	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 7 6.063297827960923
2024-06-12 16:30:13,544	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 8 5.95153168507386
2024-06-12 16:30:24,815	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 9 6.1848391180392355
2024-06-12 16:30:36,278	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 10 6.122179424972273
2024-06-12 16:30:47,792	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 11 6.447680699988268
2024-06-12 16:30:59,664	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 12 6.408578730071895
2024-06-12 16:31:11,551	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 13 6.443478961009532
Multiprocessing pool 7 4.402756509021856
Multiprocessing pool 8 4.378966056974605
Multiprocessing pool 9 4.403473759070039
Multiprocessing pool 10 4.3998749039601535
Multiprocessing pool 11 4.394448317005299
Multiprocessing pool 12 4.356208867044188
Multiprocessing pool 13 4.381642031017691
Joblib 7 4.386512518976815
Joblib 8 4.515030126902275
Joblib 9 4.511033034999855
Joblib 10 4.548703449079767
Joblib 11 4.687342124991119
Joblib 12 4.604893620009534
Joblib 13 4.515453343046829

Essentially, I wrote a simple function that takes about 4 seconds to execute on my machine, and I used Ray Tasks to see how long it would take to run that function concurrently using multiple processes. I expected that, so long as I did not exceed the number of CPU threads available on my machine, the script should complete with an added minimal overhead compared to the single function execution. If you look at the result, this is not quite the case. Ray’s overhead is significant, but it is not my main concern. While execution time remains fairly constant up to 10 tasks, it starts increasing from 11 onwards. In other words, once the number of created tasks exceeds the number of CPU cores (not CPU threads), the execution time increases notably. To have a point of comparison, I repeated the exercise using Ray’s distributed multiprocessing.Pool, multiprocessing, and joblib. In all cases, I obtained the scaling I was expecting. However, I note that Ray performed quite worse than the other two approaches.

My question is: am I doing something wrong or is there a problem with Ray?

EDIT I ran a similar code on my laptop, which has a more recent architecture, and the results are still puzzling for the Ray Tasks approach. Any hint as to what is going on would be greatly appreciated.

System specifications

Intel® Core™ i7-12700H Processor, 32 GB RAM, Ubuntu 22.04.4 LTS, Python 3.11.9, Ray 2.24.0

Python script
import time

import numpy as np


def f():
    a = np.arange(10000).reshape(100, 100)
    for _ in range(5000):
        a @ a


def ray_tasks():
    import ray

    ray.init()

    for i in range(20):
        counter = time.perf_counter()

        ray.get([ray.remote(f).remote() for _ in range(i + 1)])

        print("Ray tasks", i + 1, time.perf_counter() - counter)

    ray.shutdown()


def ray_pool():
    import ray
    from ray.util.multiprocessing import Pool

    for i in range(20):
        ray.init()

        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=i + 1) as pool:
                pool.apply(f)

        print("Ray pool", i + 1, time.perf_counter() - counter)

        ray.shutdown()


def multiprocessing_pool():
    from multiprocessing import Pool

    for i in range(20):
        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=i + 1) as pool:
                pool.apply(f)

        print(
            "Multiprocessing pool",
            i + 1,
            time.perf_counter() - counter,
        )


def joblib():
    from joblib import Parallel, delayed

    for i in range(20):
        counter = time.perf_counter()

        Parallel(n_jobs=i + 1)(delayed(f)() for _ in range(1))

        print("Joblib", i + 1, time.perf_counter() - counter)


if __name__ == "__main__":
    ray_tasks()
    ray_pool()
    multiprocessing_pool()
    joblib()

Shell output
2024-06-12 23:53:55,393	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray tasks 1 1.970784067000011
Ray tasks 2 1.9929256460000033
Ray tasks 3 2.0643412360000184
Ray tasks 4 2.0938583820000076
Ray tasks 5 2.192866852999998
Ray tasks 6 2.2023652229999584
Ray tasks 7 2.9399570940000217
Ray tasks 8 3.0420173849999514
Ray tasks 9 3.0500584799999615
Ray tasks 10 3.175444055000014
Ray tasks 11 3.1887840939999705
Ray tasks 12 3.271405764000008
Ray tasks 13 3.5609880169999997
Ray tasks 14 3.6052319080000075
Ray tasks 15 3.695604482999954
Ray tasks 16 3.775614377000011
Ray tasks 17 3.960126572999968
Ray tasks 18 4.051167329000009
Ray tasks 19 4.20454326600003
Ray tasks 20 4.40511036099997
2024-06-12 23:55:01,491	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 1 2.257755994999968
2024-06-12 23:55:07,605	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 2 2.329595984999969
2024-06-12 23:55:13,788	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 3 2.2923772070000155
2024-06-12 23:55:19,875	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 4 2.327277007999953
2024-06-12 23:55:25,983	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 5 2.3225214020000067
2024-06-12 23:55:31,967	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 6 2.371522110000001
2024-06-12 23:55:38,283	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 7 2.4213874860000146
2024-06-12 23:55:44,349	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 8 2.414436659000046
2024-06-12 23:55:50,537	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 9 2.4286837869999545
2024-06-12 23:55:56,724	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 10 2.4691437530000258
2024-06-12 23:56:02,915	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 11 2.4919973339999615
2024-06-12 23:56:08,988	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 12 2.4796971289999874
2024-06-12 23:56:15,365	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 13 2.518938802999969
2024-06-12 23:56:21,446	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 14 2.5713047920000918
2024-06-12 23:56:27,828	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 15 2.565930156000036
2024-06-12 23:56:34,113	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 16 2.595815722999987
2024-06-12 23:56:40,501	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 17 2.614837869999974
2024-06-12 23:56:46,788	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 18 2.669612454000003
2024-06-12 23:56:53,305	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 19 2.6797189239999852
2024-06-12 23:56:59,818	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 20 2.7144729949999373
Multiprocessing pool 1 1.8585168200000908
Multiprocessing pool 2 1.8712580300000354
Multiprocessing pool 3 1.8769967529999576
Multiprocessing pool 4 1.864420897000059
Multiprocessing pool 5 1.923539025000082
Multiprocessing pool 6 1.8646343560000105
Multiprocessing pool 7 1.868786837000016
Multiprocessing pool 8 1.8735884349999878
Multiprocessing pool 9 1.8758631030000288
Multiprocessing pool 10 1.887016770999935
Multiprocessing pool 11 1.8737686750000648
Multiprocessing pool 12 1.8753576350000003
Multiprocessing pool 13 1.8865805980000232
Multiprocessing pool 14 1.8779698019999387
Multiprocessing pool 15 1.8914761739999904
Multiprocessing pool 16 1.8764429469999868
Multiprocessing pool 17 1.885768778000056
Multiprocessing pool 18 1.8851808620000838
Multiprocessing pool 19 1.8878558170000588
Multiprocessing pool 20 1.8886325659999557
Joblib 1 1.8592180939999707
Joblib 2 2.020081859999891
Joblib 3 2.0148214810000127
Joblib 4 2.019257920999962
Joblib 5 2.028250092999997
Joblib 6 2.110363760000041
Joblib 7 2.046189142000003
Joblib 8 1.905242625000028
Joblib 9 1.9169077079999397
Joblib 10 1.9456904459999578
Joblib 11 2.053297143000009
Joblib 12 1.9113791859999765
Joblib 13 1.8826983330000076
Joblib 14 1.8873127709999835
Joblib 15 1.8995699490000106
Joblib 16 1.8798381699999709
Joblib 17 1.890441494999891
Joblib 18 1.8876806759999454
Joblib 19 1.8911033690000068
Joblib 20 1.8818368070000133
1 Like

My bad, sorry. I was not using the pool resources properly and got confused in my interpretation. Please find the intended code and results below. Feel free to close the thread.

Python script
import time

import numpy as np

PHYSICAL_CORES_COUNT = 10


def f(iterations):
    a = np.arange(10000).reshape(100, 100)
    for _ in range(iterations):
        a @ a


class Actor:
    def f(self, iterations):
        return f(iterations)


def ray_tasks():
    import ray

    ray.init()

    for i in range(7):
        thread_count = i + PHYSICAL_CORES_COUNT - 3

        counter = time.perf_counter()

        ray.get([ray.remote(f).remote(j) for j in [5000] * thread_count])

        print("Ray tasks", thread_count, time.perf_counter() - counter)

    ray.shutdown()


def ray_pool():
    import ray
    from ray.util.multiprocessing import Pool

    for i in range(7):
        thread_count = i + PHYSICAL_CORES_COUNT - 3

        ray.init()

        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=thread_count) as pool:
                pool.map(f, [5000] * thread_count)

        print("Ray pool", thread_count, time.perf_counter() - counter)

        ray.shutdown()


def ray_actor_pool():
    import ray
    from ray.util import ActorPool

    ray.init()

    for i in range(7):
        thread_count = i + PHYSICAL_CORES_COUNT - 3

        counter = time.perf_counter()

        pool = ActorPool([ray.remote(Actor).remote() for j in range(thread_count)])
        for r in pool.map(lambda a, v: a.f.remote(v), [5000] * thread_count):
            pass

        print("Ray actor pool", thread_count, time.perf_counter() - counter)

    ray.shutdown()


def multiprocessing_pool():
    from multiprocessing import Pool

    for i in range(7):
        thread_count = i + PHYSICAL_CORES_COUNT - 3

        counter = time.perf_counter()

        if __name__ == "__main__":
            with Pool(processes=thread_count) as pool:
                pool.map(f, [5000] * thread_count)

        print("Multiprocessing pool", thread_count, time.perf_counter() - counter)


def joblib():
    from joblib import Parallel, delayed

    for i in range(7):
        thread_count = i + PHYSICAL_CORES_COUNT - 3

        counter = time.perf_counter()

        Parallel(n_jobs=thread_count)(delayed(f)(j) for j in [5000] * thread_count)

        print("Joblib", thread_count, time.perf_counter() - counter)


if __name__ == "__main__":
    ray_tasks()
    ray_pool()
    ray_actor_pool()
    multiprocessing_pool()
    joblib()

Shell output
2024-06-25 16:41:22,350	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray tasks 7 5.689788408111781
Ray tasks 8 5.32903665304184
Ray tasks 9 5.386964575154707
Ray tasks 10 5.364450054941699
Ray tasks 11 7.413882049033418
Ray tasks 12 7.5424694628454745
Ray tasks 13 7.713358506094664
2024-06-25 16:42:12,060	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 7 6.234736783895642
2024-06-25 16:42:23,518	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 8 6.17812447110191
2024-06-25 16:42:34,763	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 9 6.2092187099624425
2024-06-25 16:42:46,428	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 10 6.330793595872819
2024-06-25 16:42:58,062	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 11 8.792797883972526
2024-06-25 16:43:12,123	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 12 8.964155843947083
2024-06-25 16:43:26,283	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray pool 13 9.077670622151345
2024-06-25 16:43:40,532	INFO worker.py:1761 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Ray actor pool 7 6.05042157205753
Ray actor pool 8 6.052213195012882
Ray actor pool 9 6.12075347895734
Ray actor pool 10 6.44338786508888
Ray actor pool 11 8.636523112189025
Ray actor pool 12 8.85830734204501
Ray actor pool 13 8.974337782943621
Multiprocessing pool 7 5.1710300298873335
Multiprocessing pool 8 5.230295533081517
Multiprocessing pool 9 5.294169247848913
Multiprocessing pool 10 5.333001541905105
Multiprocessing pool 11 7.1854413310065866
Multiprocessing pool 12 7.5464570571202785
Multiprocessing pool 13 7.603542048018426
Joblib 7 5.241937935119495
Joblib 8 5.222586184972897
Joblib 9 5.29414604511112
Joblib 10 5.35186168202199
Joblib 11 7.396942375926301
Joblib 12 7.35550906509161
Joblib 13 7.5072955440264195