Ray k8s cluster, communication is slow

Hi, I launched a k8s ray cluster. I created a worker in a worker node and this worker works as a buffer. In the head node, I can use ray.get(buffer.sample.remote(batch_size) to get the data from the remote buffer. However, for large batch size, say 128*50*(40*40*3*4+72*128*3+8), it will take 2-3 seconds to get the data.

I used ray.init("auto") to launch to the ray. Are there any suggestions to reduce the communication cost?

For large data, if you move the data around, it’ll cost more time. To avoid this, you can just pass the ObjectRef around to avoid unnecessary copying the data.

For example, in the driver:

obj_ref = buffer.sample.remote(batch_size)
result = compute.remote(obj_ref)
print(ray.get(result))

Here the compute is a remote call and ray will try to schedule it to the place that ray think is the best place to run. Usually, if the node with obj_ref has resources to run the task, it’ll just be scheduled there to reduce the communication cost.

Hi, thank you. I need to get the sampled data to train the DL model in the head node in my code.

samples = ray.get(buffer.sample.remote(batch_size))  # get the data
dl_model.train(samples)  # train the model with gpu

the main process is in the head node and the buffer is in another worker node.

hi @GoingMyWay there are multiple factors it could slow down.

  1. network throughput.
  2. due to insufficient object store capacity, your data is actually hitting disk so the performance is bounded by disk io.

Do you have the full script to reproduce this issue? Also, can you run ray memory --stats-only to see if actually the object was spilled to disk?

Hi @Chen_Shen, my code is a bit complex and I would like to share the yaml file that was used to create a new cluster.

apiVersion: cluster.ray.io/v1
kind: RayCluster
metadata:
  name: ray-cluster
spec:
  # The maximum number of workers nodes to launch in addition to the head node.
  maxWorkers: 100
  # The autoscaler will scale up the cluster faster with higher upscaling speed.
  # E.g., if the task requires adding more nodes then autoscaler will gradually
  # scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
  # This number should be > 0.
  upscalingSpeed: 1.0
  # If a node is idle for this many minutes, it will be removed.
  idleTimeoutMinutes: 5
  # Specify the pod type for the ray head node (as configured below).
  headPodType: rayHead
  # Specify the allowed pod types for this ray cluster and the resources they provide.
  podTypes:
    - name: rayHead
      minWorkers: 0
      maxWorkers: 0
      rayResources: {}
      podConfig:
        apiVersion: v1
        kind: Pod
        metadata:
          generateName: ray-head-
        spec:
          imagePullSecrets:
            - name: gitlab-cr-pull-secret
            - name: regcred
          priorityClassName: high
          restartPolicy: Never
          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumes:
            - name: workspace-vol
              hostPath:
                path: /mnt/home/%USER/Projects/work_dir
                type: Directory 
            - name: dshm
              emptyDir:
                medium: Memory
          containers:
            - name: ray-node
              imagePullPolicy: Always
              image: "the.image:tag"
              # Do not change this command - it keeps the pod alive until it is
              # explicitly killed.
              command: ["/bin/bash", "-c", "--"]
              args: ["trap : TERM INT; sleep infinity & wait;"]
              env:
                - name: RAY_gcs_server_rpc_server_thread_num
                  value: "1"
              ports:
                - containerPort: 6379 # Redis port for Ray <= 1.10.0. GCS server port for Ray >= 1.11.0.
                - containerPort: 10001 # Used by Ray Client
                - containerPort: 8265 # Used by Ray Dashboard
                - containerPort: 8000 # Used by Ray Serve

              # This volume allocates shared memory for Ray to use for its plasma
              # object store. If you do not provide this, Ray will fall back to
              # /tmp which cause slowdowns if is not a shared memory volume.
              volumeMounts:
                - name: workspace-vol
                  mountPath: /home/me/app/
                  readOnly: false
                - mountPath: /dev/shm
                  name: dshm
              resources:
                requests:
                  cpu: 10
                  memory: 100Gi
                  nvidia.com/gpu: 1
                limits:
                  cpu: 10
                  # The maximum memory that this pod is allowed to use. The
                  # limit will be detected by ray and split to use 10% for
                  # redis, 30% for the shared memory object store, and the
                  # rest for application memory. If this limit is not set and
                  # the object store size is not set manually, ray will
                  # allocate a very large object store in each pod that may
                  # cause problems for other pods.
                  memory: 50Gi
                  nvidia.com/gpu: 1
          nodeSelector: {}
          tolerations: []
    - name: rayWorker
      minWorkers: 2
      maxWorkers: 2
      rayResources: {}
      podConfig:
        apiVersion: v1
        kind: Pod
        metadata:
          generateName: ray-worker-
        spec:
          imagePullSecrets:
            - name: gitlab-cr-pull-secret
            - name: regcred
          priorityClassName: high
          restartPolicy: Never
          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumes:
            - name: workspace-vol
              hostPath:
                path: /mnt/home/%USER/Projects/work_dir
                type: Directory
            - name: dshm
              emptyDir:
                medium: Memory
          containers:
            - name: ray-node
              imagePullPolicy: Always
              image: "the.image:tag"
              # Do not change this command - it keeps the pod alive until it is
              # explicitly killed.
              command: ["/bin/bash", "-c", "--"]
              args: ["trap : TERM INT; sleep infinity & wait;"]
              env:
                - name: RAY_gcs_server_rpc_server_thread_num
                  value: "1"
              ports:
                - containerPort: 6379 # Redis port for Ray <= 1.10.0. GCS server port for Ray >= 1.11.0.
                - containerPort: 10001 # Used by Ray Client
                - containerPort: 8265 # Used by Ray Dashboard
                - containerPort: 8000 # Used by Ray Serve

              # This volume allocates shared memory for Ray to use for its plasma
              # object store. If you do not provide this, Ray will fall back to
              # /tmp which cause slowdowns if is not a shared memory volume.
              volumeMounts:
                - name: workspace-vol
                  mountPath: /home/me/app/
                  readOnly: false
                - mountPath: /dev/shm
                  name: dshm
              resources:
                requests:
                  cpu: 33
                  memory: 50Gi
                  nvidia.com/gpu: 0
                limits:
                  cpu: 33
                  # The maximum memory that this pod is allowed to use. The
                  # limit will be detected by ray and split to use 10% for
                  # redis, 30% for the shared memory object store, and the
                  # rest for application memory. If this limit is not set and
                  # the object store size is not set manually, ray will
                  # allocate a very large object store in each pod that may
                  # cause problems for other pods.
                  memory: 100Gi
                  nvidia.com/gpu: 0
          nodeSelector: {}
          tolerations: []
          
  # Commands to start Ray on the head node. You don't need to change this.
  # Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward.
  headStartRayCommands:
    - ray stop
    - ulimit -n 65536; ray start --head --port=6379 --no-monitor --dashboard-host 0.0.0.0
  # Commands to start Ray on worker nodes. You don't need to change this.
  workerStartRayCommands:
    - ray stop
    - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379

The output of ray memory --stats-only is:

======== Object references status: 2022-06-15 13:08:16.505435 ========
--- Aggregate object store stats across all nodes ---
Plasma memory usage 17 MiB, 31 objects, 0.02% full, 0.0% needed
Objects consumed by Ray tasks: 8822743 MiB.
1 Like

Hi, @Chen_Shen, I set object_store_memory in ray.init but it reports the following error,

ValueError: When connecting to an existing cluster, object_store_memory must not be provided.

thanks! it’s a bit odd as your object store is quite big compared to the object you are trying to transfer; so most likely you are bottlenecked by the object pull/pushing.

. However, for large batch size, say 128*50*(40*40*3*4+72*128*3+8)

Just to confirm, does this mean the number of rows is 128*50*(40*40*3*4+72*128*3+8), which equals ~300M of rows? if so, what’s the schema of each row? if you have a couple of columns it could easily become a few, even 10s of GBs of data.

Hi, Dear @Chen_Shen, One thing I want to note here is that I define a Buffer class which is an actor,

buffer = ray.remote(ReplayBufferwithQueue).options(name=f"Buffer",
                                                   num_cpus=2,
                                                   max_concurrency=10,
                                                   num_gpus=0).remote(
    scheme=scheme,
    groups=groups,
    buffer_size=args.buffer_size,
    max_seq_length=min(env_info["episode_limit"], args.mp_episode_truncate_len) + 1,
    preprocess=preprocess,
    device="cpu" if args.buffer_cpu_only else args.device,
    queue=queue,
    args=args,
)
assert ray.get(buffer.ready.remote())
ray_ws = [buffer.run.remote()]

Here I define the run function in this way:

class ReplayBufferwithQueue(ReplayBuffer):
    def __init__(self, scheme, groups, buffer_size, max_seq_length, preprocess=None, device="cpu", queue=None, buffer_queue=None, args=None):
        super().__init__(scheme, groups, buffer_size, max_seq_length, preprocess, device, args=args)
        logging.basicConfig(level="INFO")
        self.queue = queue
        self.buffer_queue = buffer_queue

    def run(self):
        count = 0
        try:
            while True:
                if self.queue.qsize() == 0:
                    time.sleep(1)
                
                st = time.time()
                episode_batch = self.queue.get()
                self.insert_episode_batch(episode_batch)
                logging.debug(f"Get new episdoe data, now count: {count}, time cost: {(time.time()-st):.2f}")
                count += 1
        except Exception as e:
            logging.error(f"Error in training: {e}")
            traceback.print_exc()

In the main process, I use ray.get(buffer.sample.remote(batch_size)) to sample the data. I think the reason why ray.get here is slow is that buffer actor is actually running in the background and it needs to switch from the run function to sample function, so it takes much longer to get the data. Is it correct?

Hi @Chen_Shen Thanks for the reply. That is the total bytes. Actually, the data is much larger, say 50*1000*(7*88*88*3 + 102*179*3 + 120) bytes.

@Chen_Shen, I used ray’s queue in the Buffer’s run function, and in the main process, I can get the data via queue.get(), it will avoid the concurrent issue.

    def run(self):
        count = 0
        try:
            while True:
                if self.queue.qsize() == 0:
                    time.sleep(1)
                
                st = time.time()
                episode_batch = self.queue.get()
                self.insert_episode_batch(episode_batch)
                logging.debug(f"Get new episdoe data, now count: {count}, time cost: {(time.time()-st):.2f}")
                count += 1
                queue.put(self.sample(batch_size))  # sample and put the data into the queue.
        except Exception as e:
            logging.error(f"Error in training: {e}")
            traceback.print_exc()

But, it seems queue.get()is still slow. I guess the reason is ray.get() is slow because ray’s queue.get() was built on top of ray.get(). Do you have any suggestions to change the setting to make it faster?

@Chen_Shen, Hi, I found a script to demonstrate this issue

import ray
import time
import numpy as np


ray.init()


@ray.remote(num_cpus=1)
class test:
    def __init__(self, shape):
        self.shape = shape
        self.np_array = np.zeros(shape, dtype=bool)

    def get_col_slice(self, col_idx):
        return self.np_array[:, col_idx]

    def get_array(self):
        return self.np_array

    def write_to_slice(self, col_idx, nrows):
        self.np_array[:, col_idx] = [1] * nrows

    def get_row_slice(self, start, stop):
        return self.np_array[start:stop, :]


shape = (600, 600, 600, 100)
test_actor = test.remote(shape)

st = time.time()
# Method 1 to get the array (results in copy, wasting memory)
result_array = ray.get(test_actor.get_array.remote())
print(f'time cost: {(time.time()-st):.2f}')

It will take much time to get the data. See this link: Ray get on actor method is super slow - Question · Issue #15977 · ray-project/ray · GitHub

In the main process, I use ray.get(buffer.sample.remote(batch_size)) to sample the data. I think the reason why ray.get here is slow is that buffer actor is actually running in the background and it needs to switch from the run function to sample function, so it takes much longer to get the data. Is it correct?

Yes, that’s very likely that when you call sample, the buffer actor is actually busy running run function. You can add some logging to confirm the theory.

And for the repro script

...
shape = (600, 600, 600, 100)
test_actor = test.remote(shape)

st = time.time()
# Method 1 to get the array (results in copy, wasting memory)
result_array = ray.get(test_actor.get_array.remote())
print(f'time cost: {(time.time()-st):.2f}')

The data is quite huge. So the slowness might be caused by data being spilled onto disk.

Hi, since the task is running in the same machine. The obj memory stores the data. I think the main process can get the data without copying the data right and the cost could be small.

@GoingMyWay

Hey @GoingMyWay, any chance you verified that your sample call is blocked by run?

@Chen_Shen, I think sample was not blocked by run. The reason is that the sampled data is actually very large. I guess Ray did not optimize the data transferring process.