Job Queue in Ray?

For larger amounts of tasks to be processed, it seems like it would be a good fit to have some kind of job queue, from where tasks are assigned to workers or workers eat from the Queue. In particular, this seems to be useful for handling error scenarios, retries, etc. in a robust fashion.

Is there anything like that envisioned for Ray?

2 Likes

The company that’s backing Ray (Anyscale) already uses Ray like a job queue, but the current state of art is not sufficient to implement “legit job queue” (e.g., we probably need to implement HA, which will be implemented in the OSS in next 3~4 months). But I agree this would be a cool new stuff to have on Ray, and this was brought up internally. Would you also mind creating a feature request to our Github page so this can bring more attention?

ah, cool – thanks a lot for sharing.

Re your comment wrt anyscale using ray as a job queue – i guess you’re referring to using ray as part of some anyscale-internal application? If so – based on your practical experience – is it sufficient to be used as a job queue (ignoring non-functionals like HA for now), or is there specific functionality you think is missing?

I think it is missing the ability to persist messages upon failures. I think currently using Ray as a job queue is similar to use celery with Redis (instead of other brokers that can provide strong persistency). It is probably possible to make a good solution, but there’s no best practice right now.

I am curious; what kind of competitive benefits you are seeing from Ray as a job queue (given problems I mentioned are resolved)?

I am curious; what kind of competitive benefits you are seeing from Ray as a job queue (given problems I mentioned are resolved)?

i’m just looking at it from a practical perspective. For example, if i invoke a larger amount of longer-running tasks, they’ll naturally be queued up, assuming there will be some limit to adding more capacity.
Based on that it feels like the lines are a bit blurring with the need for some kind of queue. Not sure whether your view of the scope of Ray is that it would include such a functionality in a robust fashion, or whether you’d assume such functionality to reside outside of Ray.

WDYT?

In my opinion, it is something that should be built on top of Ray (so we implement fault-tolerant & durable queue on top of Ray to support this type of workload). Ray can support some functionalities that could help to build this type of library that requires durability (though I don’t know what we will need until we actually build a job queue on top of Ray).

thx for sharing your thoughts. i think it’s a bit of a blurry area – where would you draw the line? when i invoke a number of tasks and use the built-in monitoring, i see sth like ‘n tasks pending’ – which feels like a queue :slight_smile: ? when putting a queue in front of ray, how would the tasks i put into that queue passed on to Ray for processing?

The immediate idea coming to my head is to have a separate queue for durability (e.g., zeromq), and you can spawn a separate server or actors that pull messages from that queue (and publish the result back to the queue so that the consumer can use them). And you can monitor the queue for seeing the state.

yeah, that would certainly be a possibility. A downside of this is that I’d have to craft these things by myself, vs the nice convenience i have today where i can just invoke tasks as often as i like, they go automatically in pending state if there is no immediate capacity available, and become active as capacity becomes available.

Hi @sangcho , just see this thread.
As a BI company, we build many ETL to serve the data warehouse.
I had a chance to experience most libraries out there, let me list some:

  • Airflow: The first to think of, but it’s too heavy… and it doesn’t have a native executor. The best executor choice here is Celery which I don’t like, I’ll tell you later why.

  • Celery: The most popular worker I think. It’s superb to execute parallel tasks, but it’s bad when it comes to concurrent tasks (which are the major of ETL jobs).

Celery does have greenlet workers, but still not my choice anyway. First, those greenlet workers are buggy, you can’t call external commands (such as shutdown, restart…).
Second, greenlet doesn’t return the result of the job in an async style (first done first return)
Third, greenlet doesn’t work with async functions. So for calling apis, you can’t use aiohttp but requests instead. The greenlet will try to monkeypatch the normal function to async one… But you can’t control, you are not aware whether a function is able to be patched or not… so not a solid choice…

  • Faust: this one is great, use kafka, it’s perfect for async tasks… again it uses single core…
    In case of Celery, I can create a prefork worker and a greenlet worker and pass tasks around… sound a little bit stupid but still work haha… In case of Faust I can’t, there is no parallel worker option…
    What if I have a computer with 16 cores, should I start 16 instances of concurrent worker? How can I manage their health? A Cluster Manager is important which is not available.
    Kafka is a bit heavy too( Kafka+zookeeper eat ~1gb ram on average, while rabbitmq ~100mb and Redis ~ 10mb)

  • Arq: very promising but not so many stars… I’m not confident enough to use it in our production work.


Back to RAY. What I really like:

  • Super lightweight and easy to config. A don’t need another Container for the message queue. I believe you guys do use queues(Redis?) to pass the message around actors, but the end-user is care-free of that => good point.
  • The flexibility to swap between async/normal tasks. I shared my pain above… with Ray, you just need to add the async function in the actor. The result can be either ray.get or await => so this feature is really a dream to me.
  • I’m a fan of reactive programming, so I love actors, I love Akka… Although Ray actor is not as fully featured as Akka, using Python is a strong advantage (in term of integration and HR)
  • One of the most complete RPC I’ve ever seen in Python. With all other frameworks above, you need to define the functions in advance on the host… Ray helps me to separate the development process and the worker server.
  • … some more that I don’t remember lol
1 Like

So what we are doing right now is using another scheduler to call ray tasks.
It would be awesome if Ray has a job queue itself

Internally, there are some parts that we are handling manually:

  • retry policy: Ray has retry, but only if the worker dies. For jobs such as calling API requests, the server can go down whenever it feels like to lol… It’s not our fault, but we need to retry after some minutes.

  • rate limiter: I understand there is no point to have this feature now. But it’s a must for the Job Queue

  • DAG

  • Tasks monitor: We are using LOG to monitor this, would be great if it is integrated to the Dashboard

Hey @marrocksd Thanks for the great analysis! :slight_smile: !!

I have a couple questions here.

  1. You brought up “lightweight” as one of benefits of Ray. If we’d like to support the job queue, one of possible implementation is to rely on other queueing implementation (e.g., zeromq like Celery) to support durability & fault tolerance. Do you think this can affect your decision if we require to introduce extra dependency?
  2. Also, you mention Ray actors are not as fully featured as Akka. I am curious what features we are missing.

Also, to be clear about the progress of this, it is not in our radar at the current point, but we can definitely consider to develop one in the future if there are requirements (and strong arguments about benefits like your analysis!)

  1. So far we all know that it’s hard and painful to build a resilient job queue without a message queue, so I don’t mean to live without it.
  • I’m using RabbitMQ for my applications now and it’s totally acceptable (just around 100mb of ram/docker for normal load).

  • Redis is super lightweight. I tried and never had a problem with it but I heard some bad things about its fault tolerance ability, so I don’t want to take the risk (we are an outsourcing company so we are very sensitive of this lol)

  • I didn’t try ZeroMQ before, but after a quick look, I believe it’s lightweight enough. As I understand(please correct if I’m wrong), I don’t even need to host a ZeroMQ instance… somehow you guys could even integrate ZeroMQ to the core of ray… that’s would be so great… I can’t ask for more

  • Kafka+Zookeeper is too heavy. I know it’s famous for the speed and resilience.
    I used Faust+Kafka for some projects, run well on docker-compose service with 2GB of ram… of course 90% of jobs are IO-bound… not much space left for python
    Another problem, we can’t find an official Kafka image.
    I still think that Kafka is better for data streaming… We don’t need that much power for a job queue I believe.

  • In general, I think that the message queue is a must, just be sure to make a good choice for the queue. Btw, clients love the cloud… so it should be taken into consideration too… Some libs out there support AWS SQS, but no one supports Azure… I know some developers don’t like MS’s stuff but most of my US clients really do…

  1. I solo built some ETL applications with Akka, they have Scheduler, Limiter… mostly because of my personal favor
    But now I’m running a business, so I have been using Python, it is a much better option by any means. I have tried many different libraries/ frameworks. I have never stopped researching for more advanced solutions every day. Ray is a masterpiece framework that I can rely on to build scalable business apps. It makes no sense that I’m using another job queue library just to call Ray tasks on schedule but I still accept because Ray is serving me so well
1 Like

Would be fantastic if there was a rightup for devops/distributed newbies to develop this kind of infrastructure.

1 Like

Good point! For the immediate solution, we have a prototype code developed from our internal hackathon. @architkulkarni can you share that as a reference pointer?

cool – very curious to see that!

cc @architkulkarni in case you missed it.

Hi, sorry for the delay and thanks for the reminder Sang! Here’s a link to the code: README.md · GitHub. Here we use a single RabbitMQ worker to launch all of the Ray tasks, one task for each job. The queue is durable, so that it survives a restart of the RabbitMQ server and of the RabbitMQ worker. RabbitMQ waits for an “ack” from each completed job, so it also survives a restart of the Ray cluster without losing any jobs.
cc @nikitavemuri @Dmitri who also worked on this

2 Likes

@architkulkarni ty, that is awesome, simple and readable but effective. A great place for beginner to add more features as they need!