"grpc_message":"Attempted to reconnect a session that has already been cleaned up"

I am encountering the following error while running an async wait/get using Ray:

2021-11-01 09:10:41,844 - ray.util.client.dataclient - INFO - Unrecoverable error in data channel.
2021-11-01 09:10:41,844 - ray.util.client.dataclient - INFO - Shutting down data channel
Traceback (most recent call last):
  File "./examples/phoenix_select.py", line 23, in <module>
    df = phx.select(table='market_data.identifier', start_date=sd, end_date=ed)
  File "/home/srogers/repositories/phoenix/phoenix/client.py", line 210, in select
    verbose=verbose)
  File "/home/srogers/repositories/phoenix/phoenix/client.py", line 65, in _get_partitioned_ray
    out = asyncio.run(res)
  File "/opt/python3/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/opt/python3/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/home/srogers/repositories/phoenix/phoenix/client.py", line 101, in _get_dates
    out = await asyncio.gather(*done_id)
  File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/python3/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  [Previous line repeated 331 more times]
ConnectionError: Failed during this or a previous request. Exception that broke the connection: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.NOT_FOUND
        details = "Attempted to reconnect a session that has already been cleaned up"
        debug_error_string = "{"created":"@1635772241.843833378","description":"Error received from peer ipv4:10.9.67.210:10001","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Attempted to reconnect a session that has already been cleaned up","grpc_status":5}"

The job that is being executed in Ray is fairly memory intensive. When running similar jobs that require slightly less memory (i.e. a smaller dataset), the job completes just fine. The basic pattern that I’m trying to run is:

results = [Actor.remote().task.remote() for i in range(n)]

while len(results):
     done_id, results = await asyncio.wait(results, timeout=2)
     out = await asyncio.gather(*done_id)

The raylet logs are not particularly insightful, however, it does appear that object store memory & ram on the machine fills up right before the job dies and the error is thrown.

If we instead do a simple ray.wait & ray.get, we do not encounter this problem.

My thought was that the asynchronous execution is causing too many concurrent downloads from the cluster, however, if I utilize a semaphore to limit concurrency (even to just 1) I still see the same issue.

As a temporary workaround, I limit the number of tasks that can be submitted to the ray cluster so that the jobs get submitted in chunks, however, this isn’t ideal. Any suggestions?