Cluster dosen't distribute workloads

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

this code only works in head node and does not complete.

    import numpy as np
    from collections import namedtuple
    import ray
    ray.init(
    address=‘auto’
    )

    callback_entry = namedtuple(“callback_entry”, [‘index’, ‘value’])
    result_list = []
    def gathering_results_callback(entry):
     result_list[entry.index] = entry.value

    @ray.remote
    def f(G,temp_index):
     #G = np.random.randint(2, size=(N,N))
     temp = np.mean(G)
     print(temp)
     return callback_entry(index = temp_index , value = temp)

    result_list = np.zeros(500)

    A = np.random.randint(2, size=(4848,4848))
    ray_param1 = ray.put(A)

    object_ids = [f.remote(ray_param1 , temp_index) for temp_index in range(500)]

    results_from_ray = ray.get(object_ids)

    for entry in results_from_ray:
     gathering_results_callback(entry)

    ray.shutdown()

this code works in head node and worker node.

    import numpy as np
    from collections import namedtuple
    import ray
    ray.init(
    address=‘auto’
    )

    callback_entry = namedtuple(“callback_entry”, [‘index’, ‘value’])
    result_list = []
    def gathering_results_callback(entry):
     result_list[entry.index] = entry.value

    @ray.remote
    def f(N,temp_index):
     G = np.random.randint(2, size=(N,N))
     temp = np.mean(G)
     print(temp)
     return callback_entry(index = temp_index , value = temp)

    result_list = np.zeros(500)

    #A = np.random.randint(2, size=(4848,4848))
    #ray_param1 = ray.put(A)

    object_ids = [f.remote(4848 , temp_index) for temp_index in range(500)]

    results_from_ray = ray.get(object_ids)

    for entry in results_from_ray:
     gathering_results_callback(entry)

    ray.shutdown()

(f pid=893944, ip=192.168.0.50) 0.49983840432310556
(f pid=894078, ip=192.168.0.50) 0.4998172581800259
(f pid=893773, ip=192.168.0.50) 0.5000890095197639
(f pid=894361, ip=192.168.0.50) 0.4999058422240739
(f pid=893754, ip=192.168.0.50) 0.4998943118321733
(f pid=893771, ip=192.168.0.50) 0.4998878446012918
(f pid=894122, ip=192.168.0.50) 0.4999646004204381
(f pid=1593763) 0.4998877595061486
(f pid=1593765) 0.49985116859458223
(f pid=893912, ip=192.168.0.50) 0.4999485599859491
(f pid=894017, ip=192.168.0.50) 0.49994566675108104
(f pid=893782, ip=192.168.0.50) 0.5000581199827904
(f pid=893900, ip=192.168.0.50) 0.49991320295395875
(f pid=893821, ip=192.168.0.50) 0.4999603031157076
(f pid=893829, ip=192.168.0.50) 0.4998825261548432
(f pid=893877, ip=192.168.0.50) 0.4998428292705508
(f pid=1593849) 0.4999229463478526
(f pid=1593762) 0.49998498070722913
(f pid=1593764) 0.49975007556448714
(f pid=893792, ip=192.168.0.50) 0.5000395266940061
(f pid=894104, ip=192.168.0.50) 0.5000339955096995
(f pid=894087, ip=192.168.0.50) 0.49984465881612916
(f pid=893743, ip=192.168.0.50) 0.5000178699800674
(f pid=893760, ip=192.168.0.50) 0.5000872650693288
(f pid=893996, ip=192.168.0.50) 0.4997587978166629

is this impossible on cluster ?

a_id = ray.put(numpy array)
ActorFunction.remote(a_id , …)

my cluster yaml :

# A unique identifier for the head node and workers of this cluster.
cluster_name: ray_cluster_local

# Running Ray in Docker images is optional (this docker section can be commented out).
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled. Assumes Docker is installed.
#docker:
#    image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    #    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    #pull_before_run: True
    #run_options:   # Extra options to pass into "docker run"
    #    - --ulimit nofile=65536:65536

provider:
    type: local
    head_ip: 192.168.0.13
    # You may need to supply a public ip for the head node if you need
    # to run `ray up` from outside of the Ray cluster's network
    # (e.g. the cluster is in an AWS VPC and you're starting ray from your laptop)
    # This is useful when debugging the local node provider with cloud VMs.
    # external_head_ip: '192.168.0.48'
    #worker_ips: ['192.168.0.49']
    worker_ips: ['192.168.0.50']
    # Optional when running automatic cluster management on prem. If you use a coordinator server,
    # then you can launch multiple autoscaling clusters on the same set of machines, and the coordinator
    # will assign individual nodes to clusters as needed.
    #    coordinator_address: "<host>:<port>"

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: user1
    # You can comment out `ssh_private_key` if the following machines don't need a private key for SSH access to the Ray
    # cluster:
    #   (1) The machine on which `ray up` is executed.
    #   (2) The head node of the Ray cluster.
    #
    # The machine that runs ray up executes SSH commands to set up the Ray head node. The Ray head node subsequently
    # executes SSH commands to set up the Ray worker nodes. When you run ray up, ssh credentials sitting on the ray up
    # machine are copied to the head node -- internally, the ssh key is added to the list of file mounts to rsync to head node.
    #ssh_private_key: ~/.ssh/id_rsa
    ssh_public_key: ~/.ssh/id_rsa.pub

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
# Typically, min_workers == max_workers == len(worker_ips).
# This field is optional.
min_workers: 1

# The maximum number of workers nodes to launch in addition to the head node.
# This takes precedence over min_workers.
# Typically, min_workers == max_workers == len(worker_ips).
# This field is optional.
max_workers: 1
# The default behavior for manually managed clusters is
# min_workers == max_workers == len(worker_ips),
# meaning that Ray is started on all available nodes of the cluster.
# For automatically managed clusters, max_workers is required and min_workers defaults to 0.

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

idle_timeout_minutes: 1

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH. E.g. you could save your conda env to an environment.yaml file, mount
# that directory to all nodes and call `conda -n my_env -f /path1/on/remote/machine/environment.yaml`. In this
# example paths on all nodes must be the same (so that conda can be called always with the same argument)
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []
# List of shell commands to run to set up each nodes.
setup_commands: []
    # If we have e.g. conda dependencies stored in "/path1/on/local/machine/environment.yaml", we can prepare the
    # work environment on each worker by:
    #   1. making sure each worker has access to this file i.e. see the `file_mounts` section
    #   2. adding a command here that creates a new conda environment on each node or if the environment already exists,
    #     it updates it:
    #      conda env create -q -n my_venv -f /path1/on/local/machine/environment.yaml || conda env update -q -n my_venv -f /path1/on/local/machine/environment.yaml
    #

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: [] 
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - conda activate my_venv && ulimit -c unlimited && ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
    - conda activate py36 && ray stop
    - conda activate py36 && ulimit -c unlimited && RAY_REDIS_ADDRESS=192.168.0.13:6000 RAY_scheduler_spread_threshold=0.0 ray start --port=6666 --head --node-ip-address=192.168.0.13 --include-dashboard=false --num-cpus=5 --num-gpus=1 --memory=$((13*$((2**30)))) --redis-password=123456 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - ray start --address=$RAY_HEAD_IP:6379
    - conda activate py36 && ray stop
    - conda activate py36 && RAY_scheduler_spread_threshold=0.0 ray start --num-cpus=50 --memory=$((40*$((2**30)))) --address=192.168.0.13:6666 --node-manager-port 40405 --object-manager-port 42015