Use Dagster with Ray

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity

Please share your experience of using Dagster together with Ray!

1 Like

also interested in this!

Related thread: Slack

For better visibility, below my tiny sample implementation of Dagster + Ray using Dagster Resources, based on the discussion on the above mentioned Slack thread.

Disclaimer: quite new to Dagster, so there might be better ways of doing this or it might have its limitations. Works on my pc though!

import ray
from dagster import asset, Definitions, ConfigurableResource


@ray.remote
def square(x):
    return x * x


@ray.remote
def double(x):
    return x + x


class RayResource(ConfigurableResource):
    address: str = None
    ignore_reinit_error: bool = True

    def connect(self) -> None:
        ray.init(address=self.address,
                 ignore_reinit_error=self.ignore_reinit_error)


@asset
def run_squares(ray_conn: RayResource):
    ray_conn.connect()
    futures = [square.remote(i) for i in range(5000)]
    print(ray.get(futures))


@asset(deps=[run_squares])
def run_doubles(ray_conn: RayResource):
    ray_conn.connect()
    futures = [double.remote(i) for i in range(4000)]
    print(ray.get(futures))


defs = Definitions(
    assets=[run_squares, run_doubles],
    resources={
            "ray_conn": RayResource(address="ray://127.0.0.1:10001",
                                    ignore_reinit_error=True),
        },
)