For larger amounts of tasks to be processed, it seems like it would be a good fit to have some kind of job queue, from where tasks are assigned to workers or workers eat from the Queue. In particular, this seems to be useful for handling error scenarios, retries, etc. in a robust fashion.
The company thatās backing Ray (Anyscale) already uses Ray like a job queue, but the current state of art is not sufficient to implement ālegit job queueā (e.g., we probably need to implement HA, which will be implemented in the OSS in next 3~4 months). But I agree this would be a cool new stuff to have on Ray, and this was brought up internally. Would you also mind creating a feature request to our Github page so this can bring more attention?
Re your comment wrt anyscale using ray as a job queue ā i guess youāre referring to using ray as part of some anyscale-internal application? If so ā based on your practical experience ā is it sufficient to be used as a job queue (ignoring non-functionals like HA for now), or is there specific functionality you think is missing?
I think it is missing the ability to persist messages upon failures. I think currently using Ray as a job queue is similar to use celery with Redis (instead of other brokers that can provide strong persistency). It is probably possible to make a good solution, but thereās no best practice right now.
I am curious; what kind of competitive benefits you are seeing from Ray as a job queue (given problems I mentioned are resolved)?
iām just looking at it from a practical perspective. For example, if i invoke a larger amount of longer-running tasks, theyāll naturally be queued up, assuming there will be some limit to adding more capacity.
Based on that it feels like the lines are a bit blurring with the need for some kind of queue. Not sure whether your view of the scope of Ray is that it would include such a functionality in a robust fashion, or whether youād assume such functionality to reside outside of Ray.
In my opinion, it is something that should be built on top of Ray (so we implement fault-tolerant & durable queue on top of Ray to support this type of workload). Ray can support some functionalities that could help to build this type of library that requires durability (though I donāt know what we will need until we actually build a job queue on top of Ray).
thx for sharing your thoughts. i think itās a bit of a blurry area ā where would you draw the line? when i invoke a number of tasks and use the built-in monitoring, i see sth like ān tasks pendingā ā which feels like a queue ? when putting a queue in front of ray, how would the tasks i put into that queue passed on to Ray for processing?
The immediate idea coming to my head is to have a separate queue for durability (e.g., zeromq), and you can spawn a separate server or actors that pull messages from that queue (and publish the result back to the queue so that the consumer can use them). And you can monitor the queue for seeing the state.
yeah, that would certainly be a possibility. A downside of this is that Iād have to craft these things by myself, vs the nice convenience i have today where i can just invoke tasks as often as i like, they go automatically in pending state if there is no immediate capacity available, and become active as capacity becomes available.
Hi @sangcho , just see this thread.
As a BI company, we build many ETL to serve the data warehouse.
I had a chance to experience most libraries out there, let me list some:
Airflow: The first to think of, but itās too heavyā¦ and it doesnāt have a native executor. The best executor choice here is Celery which I donāt like, Iāll tell you later why.
Celery: The most popular worker I think. Itās superb to execute parallel tasks, but itās bad when it comes to concurrent tasks (which are the major of ETL jobs).
Celery does have greenlet workers, but still not my choice anyway. First, those greenlet workers are buggy, you canāt call external commands (such as shutdown, restartā¦).
Second, greenlet doesnāt return the result of the job in an async style (first done first return)
Third, greenlet doesnāt work with async functions. So for calling apis, you canāt use aiohttp but requests instead. The greenlet will try to monkeypatch the normal function to async oneā¦ But you canāt control, you are not aware whether a function is able to be patched or notā¦ so not a solid choiceā¦
Faust: this one is great, use kafka, itās perfect for async tasksā¦ again it uses single coreā¦
In case of Celery, I can create a prefork worker and a greenlet worker and pass tasks aroundā¦ sound a little bit stupid but still work hahaā¦ In case of Faust I canāt, there is no parallel worker optionā¦
What if I have a computer with 16 cores, should I start 16 instances of concurrent worker? How can I manage their health? A Cluster Manager is important which is not available.
Kafka is a bit heavy too( Kafka+zookeeper eat ~1gb ram on average, while rabbitmq ~100mb and Redis ~ 10mb)
Arq: very promising but not so many starsā¦ Iām not confident enough to use it in our production work.
Back to RAY. What I really like:
Super lightweight and easy to config. A donāt need another Container for the message queue. I believe you guys do use queues(Redis?) to pass the message around actors, but the end-user is care-free of that => good point.
The flexibility to swap between async/normal tasks. I shared my pain aboveā¦ with Ray, you just need to add the async function in the actor. The result can be either ray.get or await => so this feature is really a dream to me.
Iām a fan of reactive programming, so I love actors, I love Akkaā¦ Although Ray actor is not as fully featured as Akka, using Python is a strong advantage (in term of integration and HR)
One of the most complete RPC Iāve ever seen in Python. With all other frameworks above, you need to define the functions in advance on the hostā¦ Ray helps me to separate the development process and the worker server.
So what we are doing right now is using another scheduler to call ray tasks.
It would be awesome if Ray has a job queue itself
Internally, there are some parts that we are handling manually:
retry policy: Ray has retry, but only if the worker dies. For jobs such as calling API requests, the server can go down whenever it feels like to lolā¦ Itās not our fault, but we need to retry after some minutes.
rate limiter: I understand there is no point to have this feature now. But itās a must for the Job Queue
DAG
Tasks monitor: We are using LOG to monitor this, would be great if it is integrated to the Dashboard
You brought up ālightweightā as one of benefits of Ray. If weād like to support the job queue, one of possible implementation is to rely on other queueing implementation (e.g., zeromq like Celery) to support durability & fault tolerance. Do you think this can affect your decision if we require to introduce extra dependency?
Also, you mention Ray actors are not as fully featured as Akka. I am curious what features we are missing.
Also, to be clear about the progress of this, it is not in our radar at the current point, but we can definitely consider to develop one in the future if there are requirements (and strong arguments about benefits like your analysis!)
So far we all know that itās hard and painful to build a resilient job queue without a message queue, so I donāt mean to live without it.
Iām using RabbitMQ for my applications now and itās totally acceptable (just around 100mb of ram/docker for normal load).
Redis is super lightweight. I tried and never had a problem with it but I heard some bad things about its fault tolerance ability, so I donāt want to take the risk (we are an outsourcing company so we are very sensitive of this lol)
I didnāt try ZeroMQ before, but after a quick look, I believe itās lightweight enough. As I understand(please correct if Iām wrong), I donāt even need to host a ZeroMQ instanceā¦ somehow you guys could even integrate ZeroMQ to the core of rayā¦ thatās would be so greatā¦ I canāt ask for more
Kafka+Zookeeper is too heavy. I know itās famous for the speed and resilience.
I used Faust+Kafka for some projects, run well on docker-compose service with 2GB of ramā¦ of course 90% of jobs are IO-boundā¦ not much space left for python
Another problem, we canāt find an official Kafka image.
I still think that Kafka is better for data streamingā¦ We donāt need that much power for a job queue I believe.
In general, I think that the message queue is a must, just be sure to make a good choice for the queue. Btw, clients love the cloudā¦ so it should be taken into consideration tooā¦ Some libs out there support AWS SQS, but no one supports Azureā¦ I know some developers donāt like MSās stuff but most of my US clients really doā¦
I solo built some ETL applications with Akka, they have Scheduler, Limiterā¦ mostly because of my personal favor
But now Iām running a business, so I have been using Python, it is a much better option by any means. I have tried many different libraries/ frameworks. I have never stopped researching for more advanced solutions every day. Ray is a masterpiece framework that I can rely on to build scalable business apps. It makes no sense that Iām using another job queue library just to call Ray tasks on schedule but I still accept because Ray is serving me so well
Good point! For the immediate solution, we have a prototype code developed from our internal hackathon. @architkulkarni can you share that as a reference pointer?
Hi, sorry for the delay and thanks for the reminder Sang! Hereās a link to the code: README.md Ā· GitHub. Here we use a single RabbitMQ worker to launch all of the Ray tasks, one task for each job. The queue is durable, so that it survives a restart of the RabbitMQ server and of the RabbitMQ worker. RabbitMQ waits for an āackā from each completed job, so it also survives a restart of the Ray cluster without losing any jobs.
cc @nikitavemuri@Dmitri who also worked on this