Ray / gRPC Ambiguous Error Message

Hi there,

I recently started using Ray at work to help improve my ability to do large parallel computations in Python. I have a Kubernetes cluster (running on EKS) and I installed the Ray helm chart with the Ray docker image for Python version 3.7 and Ray version 1.11.0.

I’ve been intermittently running into the following error when trying to call ray.get on a generic task that reads a set of e.g. 100 json files from S3 and combines them into a single json file:

Traceback (most recent call last):
....
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_on_k8s/tasks/lib/data_transformers.py", line 569, in ray_combine_files
    small_file_mappings = ray.get(small_file_futures)
  File "/home/airflow/.local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/home/airflow/.local/lib/python3.7/site-packages/ray/util/client/worker.py", line 359, in get
    res = self._get(to_get, op_timeout)
  File "/home/airflow/.local/lib/python3.7/site-packages/ray/util/client/worker.py", line 379, in _get
    raise decode_exception(e)
ConnectionError: GRPC connection failed: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.NOT_FOUND
	details = "Failed to serialize response!"
	debug_error_string = "{"created":"@1650490482.895831759","description":"Error received from peer ipv4:10.100.243.149:10001","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Failed to serialize response!","grpc_status":5}"

What’s really weird / bothering me is that this error is non-deterministic. I run my data pipelines in Airflow, which will automatically retry a job on failure, and I’ve had the job fail initially and then succeed on the retry. There’s nothing randomized about my task / code, so this leaves me to believe that this error has something to do with the Ray head’s communication with the workers.

Has anyone ever run into this issue before or have insight on what might be the cause? This post seemed to run into the same issue but it was never resolved: Failure to serialize response.

I would be more than happy to provide more details and answer any questions about my setup that might get to the root cause, so please don’t hesitate to ask me for more details.

(FWIW I’m guessing this is some sort of problem like a timeout error, node or pod unexpectedly dying, or the head’s connection to the service being dropped somehow, but I’m not even sure how to begin to debug this issue)

I’m not sure if this is what ultimately fixed the problem, but I prepended export AUTOSCALER_MAX_NUM_FAILURES=inf; to my Ray head start commands in the raycluster.yaml file (as suggested here: Cluster Deployment Guide — Ray 1.12.0) and I was ultimately able to finish my job successfully. (I also upgraded from Ray version 1.11.0 → 1.12.0).

I had seen autoscaler failure / error messages in the Ray operator logs for the last job I’d run that previously failed with this gRPC error message, so there’s some reason to believe this might have been the solution.

Perhaps @Dmitri can help with this one?

Sounds like this is working well enough now.

I’m not sure if this is what ultimately fixed the problem, but I prepended export AUTOSCALER_MAX_NUM_FAILURES=inf; to my Ray head start commands in the raycluster.yaml

The Helm chart doesn’t expose Ray head start commands. I’m just curious – did you modify the template for the Helm chart or did you go with another deployment stack?

Yeah I haven’t run into this issue (that I can remember) in a few days now, so I think this piece is resolved. I’m still having some difficulty getting long-running (i.e. ~6-10 hour) jobs to finish. For example, for a job with ~190k tasks my Ray head pod took up >64 GiB of memory which I’m not sure if that’s normal?

As for your question, I simply edited the Ray head start command in the template! I just had to delete my Ray deployment and remove some finalizers before re-deploying.

1 Like

Two things that would be massively helpful for me to know would be:

  1. If I have e.g. 100K tasks and 1K worker pods, does Ray statically assign 100 tasks to each worker when they’re submitted? Or does the scheduler re-balance the tasks across workers dynamically as they finish executing their current task(s)?

  2. Is there an easy way to configure the Helm chart to export Prometheus metrics? It seemed to me like the only way to do this is:

  1. Add e.g. --metrics-export-port=8080 to the Ray head start command
  2. Add this port as a containerPort under the podConfig in the raycluster.yaml template
  3. ???
  4. Configure prometheus to scrape K8s service endpoints

I think making something like this a configurable switch in the values.yaml might be a nice touch, I still haven’t gotten it to work but think I’m fairly close.

  1. If I have e.g. 100K tasks and 1K worker pods, does Ray statically assign 100 tasks to each worker when they’re submitted? Or does the scheduler re-balance the tasks across workers dynamically as they finish executing their current task(s)?
  • It’s the latter. Dynamic and partly decentralized scheduling is a defining characteristic of Ray which distinguishes from related systems, such as Dask.
  • … do you actually have 100K tasks and 1K worker pods? I believe Ray’s official scalability limit is 250 nodes (Ray pods in this context)

for a job with ~190k tasks my Ray head pod took up >64 GiB of memory which I’m not sure if that’s normal

You might be already doing this, but it’s important for a heavy workload to make sure not to schedule any of the tasks on the head pod – that can be achieved by annotating the head pod as having 0 CPUs (via the rayResources field). The tasks correspondingly should be marked as using CPU (@ray.remote is equivalent to @ray.remote(num_cpus=1).)

Besides that, it would be very very interesting to profile the head pod and see what processes are eating the memory (and let me know what you see :slight_smile: ).

  1. Is there an easy way to configure the Helm chart to export Prometheus metrics?

I think something along the lines of what you’re suggesting is the best that can be done at the moment.
The functionality exposed by the Helm chart is pretty bare-bones. The de-facto standard for adding more functionality has been to manually edit the template. That said, if you’re able to figure this out, feel free to open a Ray pull request!

do you actually have 100K tasks and 1K worker pods?

I’m performing feature extraction on ~190K files in S3, where each file contains time-series data I want to extract features from and the feature extraction per file takes on the order of tens to hundreds of seconds. I’m not sure if it would help to chunk these files further (e.g. by a factor of 10), so that I would only have ~19k tasks, with the tradeoff being that each task would take longer.

I started by trying to use 900 workers on 450 c5.xlarge instances (2 physical CPUs per instance), but I’m having trouble getting EKS to scale up to that size. Furthermore, I did notice that many workers seemed to have low resource utilization, so maybe decreasing the number of Ray worker pods to 250 would also help.

The big challenge I’m facing is that I’m reading over 1 TB of data from S3 on each for each job (these 190K files are ~10-20 MB each), so I’m trying to maximize bandwidth by using a large number of small machines. That being said, my feature extraction can be parallelized (each file contains ~40-50 individual time-series that can be processed in parallel), so I can try swapping my CPU bottleneck for a network bottleneck by running on fewer machines that have more cores.

You might be already doing this, but it’s important for a heavy workload to make sure not to schedule any of the tasks on the head pod

I did set rayResources: {"CPU": 0} in my values.yaml, so I think this might be something worth profiling. I was wondering if the Ray head manages some state for the number of outstanding tasks that might just be growing incrementally over time?

Also as a related question, I have the following setup for my worker nodes:

raySmallWorkerType:
        # minWorkers is the minimum number of Ray workers of this pod type to keep running.
        minWorkers: 0
        # maxWorkers is the maximum number of Ray workers of this pod type to which Ray will scale.
        maxWorkers: 900  # 450 * 2 (use number of physical CPUs)
        # memory is the memory used by this Pod type.
        # (Used for both requests and limits.)
        memory: 3Gi
        # CPU is the number of CPUs used by this pod type.
        # (Used for both requests and limits. Must be an integer, as Ray does not support fractional CPUs.)
        CPU: 1
        # GPU is the number of NVIDIA GPUs used by this pod type.
        # (Optional, requires GPU nodes with appropriate setup. See https://docs.ray.io/en/master/cluster/kubernetes-gpu.html)
        GPU: 0
        # rayResources is an optional string-int mapping signalling additional resources to Ray.
        # "CPU", "GPU", and "memory" are filled automatically based on the above settings, but can be overriden;
        # For example, rayResources: {"CPU": 0} can be used in the head podType to prevent Ray from scheduling tasks on the head.
        # See https://docs.ray.io/en/master/advanced.html#dynamic-remote-parameters for an example of usage of custom resources in a Ray task.
        rayResources: {"small-node": XXXX}

Should XXXX be 900 or 1? I.e. does this define the number of resources that one pod represents, or the total number of resources that this pod scaled to maxWorkers would represent? (Right now I’m assuming that it’s the latter)

That said, if you’re able to figure this out, feel free to open a Ray pull request!

I would love to! Once I tackle getting this feature extraction to run in a stable fashion I can definitely take a crack at adding this to the helm chart.

1 is right.

started by trying to use 900 workers on 450 c5.xlarge instances

My first instinct says that 450 2-CPU worker nodes, each taking up the entire instance could work better. That way there are fewer Ray nodes to track, overall fewer Ray processes running (half as many Raylets), and less communication overhead.

Ray head manages some state for the number of outstanding tasks that might just be growing incrementally over time

There would definitely be some queued-up state in the head node. I don’t have the expertise to give an order-of-magnitude estimate on the resulting memory footprint.

reading over 1 TB of data from S3 on each for each job

Let me call over some colleagues with knowledge of the Ray internals and data processing.
cc @Clark_Zinzow

1 Like

One more question regarding this:

My first instinct says that 450 2-CPU worker nodes, each taking up the entire instance could work better

What is the most Ray-idiomatic way to parallelize across CPUs within a single Ray worker? E.g. I tried using multiprocessing.Pool with processes=2 and am having trouble getting the function I want to apply in parallel to pickle correctly. Concretely I get this error trying to do something like the following:

from multiprocessing import Pool

def do_stuff(key, value):
    # do cool things here

@ray.remote(num_cpus=2)
def feat_extraction(full_data):
    with Pool(processes=2) as pool:
        pool.starmap(do_stuff, [(key, value) for key, value in full_data.items()])

And the error I get is:

_pickle.PicklingError: Can't pickle <function do_stuff at 0x7f436ceeb8c0>: attribute lookup do_stuff on __main__ failed

Would I have to use Ray Actor(s) to accomplish this?

(Note: Right now I’m triggering calls to feat_extraction from inside of a script under if __name__ == "__main__":, in production I wouldn’t be doing this, but ideally I could find a solution that would work in either situation)

There’s a Ray-based replacement for multiprocessing.pool
https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html

There’s also such a thing as a Ray actor pool

@rliaw sorry to dig up ancient history (2 years ago) – do you know if we have official docs for this feature?

Hi @Dmitri, I forgot to follow up on this but actually I realized the most idiomatic / natural solution (at least for my use case) was simply to submit tasks with a fraction of a "large-node" resource. In hindsight this was fairly obvious, but as a newbie I sort of overthought my solution when looking to the multiprocessing library.

So for anyone who stumbles across this in posterity, effectively what I did was define my resource like follows:

rayLargeWorkerType:
        # minWorkers is the minimum number of Ray workers of this pod type to keep running.
        minWorkers: 0
        # maxWorkers is the maximum number of Ray workers of this pod type to which Ray will scale.
        maxWorkers: 150  # 150 * 24; (assuming c5ad.12xlarge; 24 physical cores, 96GB memory)
        # memory is the memory used by this Pod type.
        # (Used for both requests and limits.)
        memory: 86Gi  # Note: this won't bin-pack perfectly, but it will leave space for the object store
        # CPU is the number of CPUs used by this pod type.
        # (Used for both requests and limits. Must be an integer, as Ray does not support fractional CPUs.)
        CPU: 24
        # GPU is the number of NVIDIA GPUs used by this pod type.
        # (Optional, requires GPU nodes with appropriate setup. See https://docs.ray.io/en/master/cluster/kubernetes-gpu.html)
        GPU: 0
        # rayResources is an optional string-int mapping signalling additional resources to Ray.
        # "CPU", "GPU", and "memory" are filled automatically based on the above settings, but can be overriden;
        # For example, rayResources: {"CPU": 0} can be used in the head podType to prevent Ray from scheduling tasks on the head.
        # See https://docs.ray.io/en/master/advanced.html#dynamic-remote-parameters for an example of usage of custom resources in a Ray task.
        rayResources: {"large-node": 1}
        # Optionally, set a node selector for this Pod type. See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector
        nodeSelector: {"node-class": "compute-large"}

The nodeSelector points to an EKS nodegroup in my K8s cluster that can scale up to a maximum of 150 nodes. Then, when submitting tasks I did the following:

futures = [process_file.options(resources={"large-node": 0.04}).remote(s3_bucket, s3_key) for s3_key in s3_keys]

In theory, this would then fit 25 tasks per-node (although of course this won’t fit exactly as some CPU/memory is reserved for other processes).

Hopefully this helps anyone with a similar problem in the future!

1 Like

Yep! No need to stack multiprocessing libraries. Use-cases for fractional resources should be made clearer in the Ray docs.