I am encountering the following error while running an async wait/get using Ray:
2021-11-01 09:10:41,844 - ray.util.client.dataclient - INFO - Unrecoverable error in data channel.
2021-11-01 09:10:41,844 - ray.util.client.dataclient - INFO - Shutting down data channel
Traceback (most recent call last):
File "./examples/phoenix_select.py", line 23, in <module>
df = phx.select(table='market_data.identifier', start_date=sd, end_date=ed)
File "/home/srogers/repositories/phoenix/phoenix/client.py", line 210, in select
verbose=verbose)
File "/home/srogers/repositories/phoenix/phoenix/client.py", line 65, in _get_partitioned_ray
out = asyncio.run(res)
File "/opt/python3/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/opt/python3/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/home/srogers/repositories/phoenix/phoenix/client.py", line 101, in _get_dates
out = await asyncio.gather(*done_id)
File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
[Previous line repeated 331 more times]
ConnectionError: Failed during this or a previous request. Exception that broke the connection: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.NOT_FOUND
details = "Attempted to reconnect a session that has already been cleaned up"
debug_error_string = "{"created":"@1635772241.843833378","description":"Error received from peer ipv4:10.9.67.210:10001","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Attempted to reconnect a session that has already been cleaned up","grpc_status":5}"
The job that is being executed in Ray is fairly memory intensive. When running similar jobs that require slightly less memory (i.e. a smaller dataset), the job completes just fine. The basic pattern that I’m trying to run is:
results = [Actor.remote().task.remote() for i in range(n)]
while len(results):
done_id, results = await asyncio.wait(results, timeout=2)
out = await asyncio.gather(*done_id)
The raylet logs are not particularly insightful, however, it does appear that object store memory & ram on the machine fills up right before the job dies and the error is thrown.
If we instead do a simple ray.wait & ray.get, we do not encounter this problem.
My thought was that the asynchronous execution is causing too many concurrent downloads from the cluster, however, if I utilize a semaphore to limit concurrency (even to just 1) I still see the same issue.
As a temporary workaround, I limit the number of tasks that can be submitted to the ray cluster so that the jobs get submitted in chunks, however, this isn’t ideal. Any suggestions?