How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
Hello,
I’m currently trying to implement an embarrassingly parallel feature extraction job using Ray. I have a large dataset (~10M items) where each item in the dataset is stored as a raw data file in S3. Feature extraction is performed using a simple python function that takes a file as its input and outputs a python object (in this case a list of dictionaries). I’ve implemented feature extraction on a single file as a task in ray and can naively run my job as follows:
@ray.remote(num_cpus=1)
def feat_extraction(s3_key):
# step 1: read file from S3
# step 2: extract list of feature dictionaries
# step 3: return list of feature dictionaries
return features
output = ray.get([feat_extraction.remote(s3_key) for s3_key in s3_keys])
This works fine when testing on a subset of the files, but the problem is that our dataset has a long tail in terms of the amount of memory that an individual feature extraction can require. Over 99% of feature extraction jobs require < 1 GB of memory, however a small fraction can require as much as ~10 GB of memory.
In order to prevent my job from becoming bottlenecked by the network – and to be efficient with my resource usage – I wanted to implement a solution like the following:
@ray.remote(num_cpus=1)
class Supervisor:
def __init__(self, s3_keys):
self.s3_keys = s3_keys
def do_task(self, s3_key):
try:
return feat_extraction.options(resources={"small-node": 1}, max_retries=0).remote(s3_key)
except (ray.exceptions.WorkerCrashedError, ray.exceptions.RayTaskError) as e:
return feat_extraction.options(resources={"large-node": 1}).remote(s3_key)
def work(self):
return ray.get([self.do_task(s3_key) for s3_key in range(self.s3_keys)])
sup = Supervisor.options(resources={"actor-node": 1}).remote()
out = ray.get(sup.work.remote())
Here I’m using the resources
field to schedule the tasks / actor on specific nodegroups in my Kubernetes cluster. "small-node"
corresponds to a nodegroup of machines with enough memory to process the 99% of files that take < 1GB of memory, and "large-node"
corresponds to a nodegroup of machines with much more memory that can handle the corner case(s) where up to 10 GB of memory may be required.
The problem is that the code I’ve implemented above does not do what I want. It seems that the RayTaskError
just blows through my error-handling logic, and I’ve been unable to come up with a way to force my task(s) to be retried on the larger nodegroup when they fail. I don’t believe that my use-case is all that uncommon, so I’m wondering if there is some way that Ray can support the more fine-grained control of task retries that I’m seeking?
I’d be happy to provide more details if needed, and am hoping that there’s a solution for this as I’m really excited about switching my data science team’s workloads from PySpark to Ray!