We are running a Dask on Ray workload and are occasionally seeing this error where the calculation fails when a node is lost. My expectation was that if a node was lost Ray could regenerate any missing intermediate results. Are Dask on Ray workloads safe to run on Spot instances where failures are expected?


Traceback (most recent call last):
File “/Users/xxx/PycharmProjects/RelayTxRay/ray-test.py”, line 37, in
File “/Users/xxx/PycharmProjects/RelayTxRay/ray-test.py”, line 30, in main
df.to_json(‘s3://xxx/xxx/ray/dask/xxx’, compression=‘gzip’)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/dask/dataframe/core.py”, line 1510, in to_json
return to_json(self, filename, *args, **kwargs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/dask/dataframe/io/json.py”, line 88, in to_json
dask_compute(parts, **compute_kwargs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/dask/base.py”, line 567, in compute
results = schedule(dsk, keys, **kwargs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/util/dask/scheduler.py”, line 457, in ray_dask_get_sync
result = ray_get_unpack(object_refs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/util/dask/scheduler.py”, line 381, in ray_get_unpack
computed_result = ray.get(object_refs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/_private/client_mode_hook.py”, line 61, in wrapper
return getattr(ray, func.name)(*args, **kwargs)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/util/client/api.py”, line 42, in get
return self.worker.get(vals, timeout=timeout)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/util/client/worker.py”, line 225, in get
res = self._get(obj_ref, op_timeout)
File “/usr/local/Caskroom/miniconda/base/envs/ray-test2/lib/python3.7/site-packages/ray/util/client/worker.py”, line 248, in _get
raise err
ray.exceptions.RayTaskError: ray::dask:write_json_partition-be1172be-97d9-45e2-85d2-726504b22e68 (pid=181, ip=
File “python/ray/_raylet.pyx”, line 460, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 481, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::dask:(‘assign-a1ecc1c1a981a7ee5ea34bf021e663e3’, 0) (pid=187, ip=
File “python/ray/_raylet.pyx”, line 460, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 481, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.ObjectLostError: Object a87b3e758b494778ffffffffffffffffffffffff0500000001000000 is lost due to node failure.

Process finished with exit code 1

Hey @shoelesself, from what I understand this should properly rely on Dask’s fault tolerance mechanism regenerate results. @Clark_Zinzow do you know if this is currently supported?

By the way, we’ve been having a discussion on Github about this particular error: Improve the object loss message. · Issue #14580 · ray-project/ray · GitHub – would love to hear your thoughts.

Hey @shoelesself, object reconstruction is currently turned off by default; we’re planning on turning it on by default after we finish some data plane work in Q3, but for now, lost intermediate results won’t be regenerated, so Dask-on-Ray workloads are not currently safe to run on spot instances, unfortunately.

Btw @rliaw, plain Dask doesn’t have any fault tolerance mechanisms, but Dask Distributed does. Turning on object reconstruction will give Dask-on-Ray approximately the same fault tolerance guarantees as Dask Distributed.

Thanks! I enabled object reconstruction and it seems to be working well. Do you have plans to add a feature similar to Dask distributed’s replicate which allows duplicating data across nodes to guard against data loss similar to RAID?

Data currently fate-shares with the Ray driver or worker that created it (the owner of the data), so replicating that data to multiple nodes wouldn’t improve that data’s resilience. We are currently prototyping mechanisms for manual and automatic transferring of ownership from worker to worker; once we’ve settled on an ownership transfer protocol, we can then start looking at automatic ownership failover, where a data copy is promoted to being the new owner. At that point, in addition to providing ownership failover for data copies that are a byproduct of task execution (i.e. data that has to be copied to another node because a task on that node needs that data), we would probably offer a ray.replicate(obj, n) API to expose user-directed data replication.

This will most likely be a longer-term effort, since the current prototype ownership transfer mechanism assigns a new owner at the time of object creation (ray.put()) and doesn’t provide a way to transfer ownership at runtime for an existing object, so we’d have to extend this protocol or move to a different one before starting to think about ownership failover.