Job Queue in Ray?

I have some questions and thoughts, and sorry for my bad English

  1. As I mentioned, a lot of real jobs will be asynchronous. And I think that we don’t need more than 1 core for async tasks.

There are some options like

  • If there are 4 cores, then use 1 for async tasks, and 3 for blocking tasks. (2 queues). The async queue will push the result to blocking queue for further processing

  • 1 queue with 4 cores. Every single worker will spawn many concurrent actors to perform tasks. This sounds better to me

  • … any?

  1. Have you thought about the backend DB to save the result of tasks? With message queues, we are moving toward the streaming processing style; the result of every single task might be super small, but there will be millions of tasks.
    The final result will need to be stored in a Cloud DB, but this Cloud DB only allows 50 concurrent connections, how should we do:
  • You can think of a shared DB session. But as we are using the distributed style, that is not feasible.

  • Then it would be better to save the results somewhere, and the push to the Cloud DB periodically.

  1. This is the method that I’ve been using to build ETLs, happy to hear ideas/feedback from you guys
  • For example, I want to get likes/views of all videos of a youtube channel:

  • With previous job queues like Faust, I first call the API (get_channel_detail), get the list of all videos in the channel. After getting a list of 10 video_id, I will post 10 messages back to the queue to trigger another function, this function (get_video_detail) will call the API to get the details, get the published_date for a video_id; if the video is published 100 days ago, I will need to send 100 other messages to get the daily trend…

    • The advantage of this method is that the get_video_detail is covered by the job queue, so it will retry if the server is busy or the worker dies; if 1 video dies, it doesn’t affect 9 others, we don’t have to run the whole 10 again… The framework has it own method to easily define the schedule that jobs run.
    • disadvantage: A parent task can’t tell whether all the jobs of its children are done. After sending out 100 messages with (video_id, date), the get_video_detail’s mission is done and it will be destroyed. You might be thinking why this is a problem… I’ll give a small example; everytime we update the data, we want to append data from the last few days only; we don’t want to get the whole history and replace… But for a newly added video, we need to get its whole historical data; which means that the Video table needs an indicator to tell whether it is new or not… So when to update that indicator, we need to know whether those 100 jobs are fully done, right? Then I need another cumberstone process to check that.

Ok you are again thinking that, why don’t I let the parent task wait for the children’s results… I have no idea how to manage the concurrency… If the server only allows 50 concurrent connections, how can you manage that? You don’t know in advance how many videos are in a channel, how many days for a video… The best way here to tell the Faust engine to limit 50 concurrent workers, then an awaiting parent task will occupy a worker.

  • With Ray core only, I first spawn a Supervisor blocking actor; on this actor, I use a BlockingSchedule(to keep the Actor alive, the AsyncSchedule + await sleep while True looks so ugly to me) from APScheduler. Then I use just the same method as above. Instead of sending messages, I spawn another actor, then send 10…100 messages concurrently to that actor… I’m still having the same pros and cons as above… But now I can’t even manage the Concurrency by any means lol; with RabbitMQ, maybe I can limit the global consumption rate? If I set the concurrency_rate for SupervisorActor, that won’t effect its grandchildren, right?

=> I would love to hear how I can improve this. How the Message Queue should be integrated in this case, and what advantages will it bring.

I first spawn a Supervisor blocking actor; on this actor, I use a BlockingSchedule(to keep the Actor alive, the AsyncSchedule + await sleep while True looks so ugly to me) from APScheduler

Disregard this, now I can find a better way. If you run a detached actor, and place the AsyncScheduler in the init , everything works nicely as a background service

My current workaround:
I have an Actor to refresh anda distribute access token. I’m writing functions on this to limit the amount of tokens given out

Hey folks! We’re thinking about doing some work in this area! Could you all reach out to beta at anyscale . dot com? We’d love to chat a bit more in depth with anyone who is interested on this topic. You can also just ping me on the ray slack, have a similar username there too :slight_smile:

2 Likes

Hi all, we have created this RFC to gather further feedback on future plans here: [RFC] Job queueing functionality with Ray Serve + Workflows · Issue #21161 · ray-project/ray · GitHub

At a high level, Serve+Workflows may be able to fill the gap!