Cluster Tasks executed count question

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi, this is my sample demo code:

from collections import Counter
import socket
import time
import ray

ray.init()


@ray.remote
class test:
    @staticmethod
    def f():
        time.sleep(3)
        # Return IP address.
        return socket.gethostbyname(socket.gethostname())


print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))

wks = [test.remote() for _ in range(3)]
for _ in wks:
    object_ids = _.f.remote()

# ip_addresses = ray.get(object_ids)
# print('Tasks executed')
# for ip_address, num_tasks in Counter(ip_addresses).items():
#     print('{} tasks on {}'.format(num_tasks, ip_address))

Explanation:

After all the above tasks are submitted to ray cluster, I want to calculate how many tasks(f() function) each node(ip) executed respectively, my demo code is based on the code of ray documents.

With Ray, the invocation of every remote operation (e.g., task, actor method) is asynchronous. This means that the operation immediately returns a promise/future, which is essentially an identifier (ID) of the operation’s result. This is key to achieving parallelism, as it allows the driver program to launch multiple operations in parallel. To get the actual results, the programmer needs to call ray.get() on the IDs of the results. This call blocks until the results are available. As a side effect, this operation also blocks the driver program from invoking other operations, which can hurt parallelism.

I generated multiple Actors, and then let each of them execute f(), this situation gives me some difficulties, as the ray doc said above if I use ray.get() to get the ip result directly it does bring block.

I am slowly and seriously learning ray documented and it is keeping me grounded.

Thank You!

I looked at the ray documents again, and there is a solution, the code is adjusted to this, can meet the basic needs, but I do not know if there is a more optimized method.

from collections import Counter
import socket
import time
import ray

ray.init()


@ray.remote
class test:
    @staticmethod
    def f():
        time.sleep(3)
        # Return IP address.
        return socket.gethostbyname(socket.gethostname())


print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))
# generator 3 Actors
wks = [test.remote() for _ in range(8)]
# execute Actor's f function
results = ray.get([_.f.remote() for _ in wks])

ip_addresses = results
print('Tasks executed')
for ip_address, num_tasks in Counter(ip_addresses).items():
    print('{} tasks on {}'.format(num_tasks, ip_address))