I can write a Ray function that returns a generator like so
from typing import List, Iterable
import ray
@ray.remote(num_returns="dynamic")
def capitalize(strings: List[str]) -> Iterable[str]:
for string in strings:
yield string.upper()
def main():
strings = ["The", "man", "is", "named", "John"]
g = capitalize.remote(strings)
for item in ray.get(g):
print(ray.get(item))
if __name__ == "__main__":
main()
This produces the expected output.
2023-04-14 16:11:55,103 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2023-04-14 16:11:55,111 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
THE
MAN
IS
NAMED
JOHN
I’d like to do the same thing with an Actor class.
from typing import List, Iterable
import ray
@ray.remote
class Capitalizer:
def capitalize(self, strings: List[str]) -> Iterable[str]:
for string in strings:
yield string.upper()
def main():
strings = ["The", "man", "is", "named", "John"]
c = Capitalizer.remote()
g = c.capitalize.remote(strings)
for item in ray.get(g):
print(ray.get(item))
if __name__ == "__main__":
main()
When I run the Actor class version I get the following error:
Traceback (most recent call last):
File "/Users/bill.mcneill/Src/mine/simple.py", line 35, in <module>
main()
File "/Users/bill.mcneill/Src/mine/simple.py", line 31, in main
print(ray.get(item))
File "/usr/local/anaconda3/envs/mine/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/usr/local/anaconda3/envs/mine/lib/python3.9/site-packages/ray/_private/worker.py", line 2369, in get
raise ValueError(
ValueError: 'object_refs' must either be an ObjectRef or a list of ObjectRefs.
(Capitalizer pid=11434) 2023-04-14 16:05:33,556 ERROR worker.py:772 -- Unhandled error: Task threw exception, but all 1 return values already created. This should only occur when using generator tasks.
(Capitalizer pid=11434) See https://github.com/ray-project/ray/issues/28689.
(Capitalizer pid=11434) Traceback (most recent call last):
(Capitalizer pid=11434) File "python/ray/_raylet.pyx", line 931, in ray._raylet.execute_task
(Capitalizer pid=11434) File "python/ray/_raylet.pyx", line 968, in ray._raylet.execute_task
(Capitalizer pid=11434) File "python/ray/_raylet.pyx", line 2500, in ray._raylet.CoreWorker.store_task_outputs
(Capitalizer pid=11434) ValueError: Task returned more than num_returns=1 objects.
I think the problem here is that I haven’t specified num_returns="dynamic"
anywhere in my Actor class. But I can’t figure out where to specify this.
How do I get the Actor version of this algorithm to work?