Ray 1.7.0 ray.init(runtime_env=) kills cluster (was: cluster stuck on "The actor or task with ID [] cannot be scheduled right now")

We just installed a new Ray 1.7.0 cluster to replace our existing Ray 1.6.0 cluster, but all our existing jobs now hang on the new cluster.

As an example, we submit this simple job which submits 5 tasks and waits for them to complete. This runs fine on our 1.6.0 cluster, but it gets immediately stuck on our 1.7.0 cluster.

Any suggestion on where to look?

# test.py

# Imports
import logging as log
logFormat=f'[%(asctime)s %(levelname)-7s] %(message)s'
log.basicConfig(format=logFormat, level=log.INFO)
import ray
import logging as log


@ray.remote(num_cpus=1)
def task(id):
    return id

ray.init(
    address='auto',
    runtime_env={
        "env_vars": {"AUTOSCALER_EVENTS":"0"},
        "conda": "dan-1",
        },
    )

# Submit jobs
jobs = []
for id in range(5):
    log.info(f"Submitting task {id}")
    jobs.append(task.remote(id))

# Wait for results
totalJobs = len(jobs)
log.info(f"Waiting for {totalJobs} Ray training jobs")
resultMap = {}
results = []
while jobs:
    [ready], jobs = ray.wait(jobs)
    id = ray.get(ready)
    log.info(f"Received id {id}")
    resultMap[id] = id
    while resultMap:
        nextGroupIndex = len(results)
        if nextGroupIndex in resultMap:
            results.append(id)
            del resultMap[nextGroupIndex]
        else:
            break
log.info("Complete")

Here is the output we see:

⇒  ray exec /ceph/var/ray/tools/docker/ray05/ray.yaml 'cd /ceph/home/djakubiec/elcano/python.focusvq/focusvq/djakubiec ; conda activate dan-1 ; python3 test.py'
Loaded cached provider configuration
If you experience issues with the cloud provider, try re-running the command with --no-config-cache.
Fetched IP: 10.0.1.34
[2021-10-14 15:56:34,813 INFO   ] Using preinstalled conda environment: dan-1
2021-10-14 15:56:34,816	INFO worker.py:826 -- Connecting to existing Ray cluster at address: 10.0.1.34:12001
[2021-10-14 15:56:34,945 INFO   ] Submitting task 0
[2021-10-14 15:56:34,946 INFO   ] Submitting task 1
[2021-10-14 15:56:34,946 INFO   ] Submitting task 2
[2021-10-14 15:56:34,947 INFO   ] Submitting task 3
[2021-10-14 15:56:34,947 INFO   ] Submitting task 4
[2021-10-14 15:56:34,947 INFO   ] Waiting for 5 Ray training jobs
2021-10-14 15:56:48,339	WARNING worker.py:1227 -- The actor or task with ID e48bf7636167cc5302b088b00562eadceafab5117718486c cannot be scheduled right now. You can ignore this message if this Ray cluster is expected to auto-scale or if you specified a runtime_env for this actor or task, which may take time to install.  Otherwise, this is likely due to all cluster resources being claimed by actors. To resolve the issue, consider creating fewer actors or increasing the resources available to this Ray cluster.
Required resources for this actor or task: {CPU: 1.000000}
Available resources on this node: {127.000000/128.000000 CPU, 3032691870.019531 GiB/3032691870.019531 GiB memory, 1508988479.980469 GiB/1508988479.980469 GiB object_store_memory, 1.000000/1.000000 node:10.0.1.34}
In total there are 1 pending tasks and 0 pending actors on this node.

And here is the autoscaler status:

2021-10-14 15:57:09,522	INFO autoscaler.py:233 --
======== Autoscaler status: 2021-10-14 15:57:09.522503 ========
Node status
---------------------------------------------------------------
Healthy:
 1 local.cluster.node
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------

Usage:
 1.0/128.0 CPU
 0.00/57.844 GiB memory
 0.00/28.782 GiB object_store_memory

Demands:
 {'CPU': 1.0}: 2+ pending tasks/actors

Our ray.yaml:

# A unique identifier for the head node and workers of this cluster.
cluster_name: fvq05

# Running Ray in Docker images is optional (this docker section can be commented out).
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled. Assumes Docker is installed.
docker:
    # The associated docker image must be built on each worker node.  You can run this as focusvq on each node:
    #
    #   docker build -t focusvq/ray01 /ceph/var/ray/tools/docker/ray01
    #
    # image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    # container_name: "ray_container"
    image: "focusvq/ray05:latest"
    container_name: "fvq_ray05"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: False
    # Extra options to pass into "docker run"
    run_options: [ 
        '--memory=96g',
        '--shm-size=32g',
        #'--mount type=bind,source=/var/ray,target=/var/ray',
        '--mount type=bind,source=/var/run/flcs.sock,target=/var/run/flcs.sock',
        '--mount type=bind,source=/home/focusvq,target=/home/focusvq,readonly',
        '--mount type=bind,source=/ceph,target=/ceph,readonly',
        '--mount type=bind,source=/ceph/var/ray/share,target=/share',
        '--group-add 1010',     # fvq-metrics
        '--group-add 5000',     # fvq-dev
        '--group-add 5001',     # fvq-web
        #'--group-add 5002',     # fvq-git
        '--group-add 5003',     # fvq-external
        '--group-add 5005',     # fvq-ops
        '--group-add 5007',     # fvq-log
        #'--user=501:501',
        '--ulimit nofile=65536:65536',
        ]

# FocusAE NOTE: Avoid using hostnames below since we generally map those to loopback addresses.  That confuses certain parts of the Ray cluster assembly
provider:
    type: local
    #head_ip: YOUR_HEAD_NODE_HOSTNAME
    #head_ip: 10.0.1.34
    # You may need to supply a public ip for the head node if you need
    # to run `ray up` from outside of the Ray cluster's network
    # (e.g. the cluster is in an AWS VPC and you're starting ray from your laptop)
    # This is useful when debugging the local node provider with cloud VMs.
    # external_head_ip: YOUR_HEAD_PUBLIC_IP
    #worker_ips: [WORKER_NODE_1_HOSTNAME, WORKER_NODE_2_HOSTNAME, ... ]
    # NOTE: Avoid using hostnames, since the Focus cluster maps these to loopback addresses and confuses some stages of Ray
    #worker_ips: [ 10.0.1.1, 10.0.1.2 ]
    #worker_ips: [ 10.0.1.1, 10.0.1.2, 10.0.1.3, 10.0.1.4, 10.0.1.5, 10.0.1.6, 10.0.1.7, 10.0.1.8, 10.0.1.9, 10.0.1.10, 10.0.1.11, 10.0.1.12, 10.0.1.13, 10.0.1.14, 10.0.1.15, 10.0.1.16, 10.0.1.17, 10.0.1.18, 10.0.1.19, 10.0.1.20, 10.0.1.21, 10.0.1.22, 10.0.1.23, 10.0.1.24, 10.0.1.25, 10.0.1.26, 10.0.1.27, 10.0.1.28, 10.0.1.29, 10.0.1.30, 10.0.1.31, 10.0.1.32, 10.0.1.33, 10.0.1.34, 10.0.1.35, 10.0.1.36, 10.0.1.37, 10.0.1.38, 10.0.1.39, 10.0.1.40 ]
    # Optional when running automatic cluster management on prem. If you use a coordinator server,
    # then you can launch multiple autoscaling clusters on the same set of machines, and the coordinator
    # will assign individual nodes to clusters as needed.
    #    coordinator_address: "<host>:<port>"
    coordinator_address: "10.0.1.250:1300"

# How Ray will authenticate with newly launched nodes.
auth:
    # ssh_user: YOUR_USERNAME
    # You can comment out `ssh_private_key` if the following machines don't need a private key for SSH access to the Ray
    # cluster:
    #   (1) The machine on which `ray up` is executed.
    #   (2) The head node of the Ray cluster.
    #
    # The machine that runs ray up executes SSH commands to set up the Ray head node. The Ray head node subsequently
    # executes SSH commands to set up the Ray worker nodes. When you run ray up, ssh credentials sitting on the ray up
    # machine are copied to the head node -- internally, the ssh key is added to the list of file mounts to rsync to head node.
    # ssh_private_key: ~/.ssh/id_rsa
    ssh_user: focusvq
    ssh_private_key: ~/.ssh/id_rsa

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
# Typically, min_workers == max_workers == len(worker_ips).
min_workers: 0

# The maximum number of workers nodes to launch in addition to the head node.
# This takes precedence over min_workers.
# Typically, min_workers == max_workers == len(worker_ips).
max_workers: 43
# The default behavior for manually managed clusters is
# min_workers == max_workers == len(worker_ips),
# meaning that Ray is started on all available nodes of the cluster.
# For automatically managed clusters, max_workers is required and min_workers defaults to 0.

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

idle_timeout_minutes: 5

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH. E.g. you could save your conda env to an environment.yaml file, mount
# that directory to all nodes and call `conda -n my_env -f /path1/on/remote/machine/environment.yaml`. In this
# example paths on all nodes must be the same (so that conda can be called always with the same argument)
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up each nodes.
setup_commands: []
    # If we have e.g. conda dependencies stored in "/path1/on/local/machine/environment.yaml", we can prepare the
    # work environment on each worker by:
    #   1. making sure each worker has access to this file i.e. see the `file_mounts` section
    #   2. adding a command here that creates a new conda environment on each node or if the environment already exists,
    #     it updates it:
    #      conda env create -q -n my_venv -f /path1/on/local/machine/environment.yaml || conda env update -q -n my_venv -f /path1/on/local/machine/environment.yaml
    #
    # Ray developers:
    # you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - conda activate my_venv && ulimit -c unlimited && ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
    #- ray stop
    #- ulimit -c unlimited && ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
    - ray stop
    - ulimit -c unlimited && ray start --head --port=12001 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - ray start --address=$RAY_HEAD_IP:6379
    #- ray stop
    #- ray start --address=$RAY_HEAD_IP:6379
    - ray stop
    - ray start --address=$RAY_HEAD_IP:12001

I was able to get this working in Ray 1.7.0 by removing the runtime_env parameter from ray.init(). Looks like something in 1.7.0 has broken support for this?

This appears to be related to Ray client fails when specifying Conda Environment

Opened issue:

Thanks for reporting this and for submitting the reproduction in the issue, we’ll get a fix out as soon as possible.

Hi @djakubiec, can you try installing pip install "ray[default]" and see if that fixes things? This includes some dependencies which are required for runtime environments. We’ll add a better error message so it doesn’t silently fail in this case.

Thanks @architkulkarni . We had already run pip install -U "ray[default]==1.7.0", so I don’t think that is the issue. Any other suggestions?