Ray Java API only execute tasks locally

Hi team, I’m trying out Ray Java API following this simple example:

I have cluster consisting of a head node and a worker node

On both nodes, I have put jar file under /var/tmp/jars

Then from the worker node, I launch my Java application with below vm options:

-Dray.address=head-node:6379 -Dray.job.code-search-path=/var/tmp/jars

The application runs successfully with below logs:

14:59:29.203 [main] DEBUG io.ray.runtime.DefaultRayRuntimeFactory - Initializing runtime with config: {"ray":{"address":"dal:6379","head-args":[],"job":{"code-search-path":"/var/tmp/jars","default-actor-lifetime":"NON_DETACHED","id":"","jvm-options":[],"namespace":"","num-java-workers-per-process":1,"runtime-env":{"env-vars":{}}},"logging":{"dir":"","level":"INFO","loggers":[],"max-backup-files":10,"max-file-size":"500MB","pattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n"},"object-store":{"socket-name":null},"raylet":{"node-manager-port":0,"socket-name":null,"startup-token":0},"redis":{"password":"5241590000000000"},"run-mode":"CLUSTER","session-dir":null}}
14:59:29.217 [main] DEBUG io.ray.runtime.util.JniUtils - Loading native library core_worker_library_java in /tmp/ray/1660805969214.
14:59:31.083 [main] DEBUG io.ray.runtime.util.JniUtils - Native library loaded.
14:59:33.177 [main] DEBUG io.ray.runtime.RayNativeRuntime - RayNativeRuntime started with store /tmp/ray/session_2022-08-18_14-52-47_745940_3536306/sockets/plasma_store, raylet /tmp/ray/session_2022-08-18_14-52-47_745940_3536306/sockets/raylet
14:59:33.205 [main] DEBUG io.ray.runtime.functionmanager.FunctionManager - Resource loaded for job 01000000 from path [file:/var/tmp/jars/, file:/var/tmp/jars/ray-0.0.1-SNAPSHOT-all.jar].
14:59:33.267 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.273 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object 16310a0f0a45af5cffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.273 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object c2668a65bda616c1ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.273 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object 32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.273 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object e0dc174c83599034ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object f4402ec78d3a2607ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object f91b78d7db9a6593ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object 82891771158d68c1ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object 8849b62d89cb30f9ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.object.ObjectRefImpl - Putting object 80e22aed7718a125ffffffffffffffffffffffff0100000001000000 to weak reference pool.
14:59:33.274 [main] DEBUG io.ray.runtime.AbstractRayRuntime - Getting Objects [c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000, 16310a0f0a45af5cffffffffffffffffffffffff0100000001000000, c2668a65bda616c1ffffffffffffffffffffffff0100000001000000, 32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000, e0dc174c83599034ffffffffffffffffffffffff0100000001000000, f4402ec78d3a2607ffffffffffffffffffffffff0100000001000000, f91b78d7db9a6593ffffffffffffffffffffffff0100000001000000, 82891771158d68c1ffffffffffffffffffffffff0100000001000000, 8849b62d89cb30f9ffffffffffffffffffffffff0100000001000000, 80e22aed7718a125ffffffffffffffffffffffff0100000001000000].
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
Ray.prepare: 95 ms
Ray.get: 10429 ms
14:59:43.868 [Thread-0] DEBUG io.ray.runtime.gcs.GcsClient - Destroying global state accessor.
14:59:43.869 [Thread-0] DEBUG io.ray.runtime.RayNativeRuntime - RayNativeRuntime shutdown

Process finished with exit code 0

However, from the Ray Dashboard, I notice that ALL tasks are actually running within the worker node instead of distributing some to the remote.

I don’t think this behavior is expected, but not sure where I did it wrong.

Any help would be appreciated.

Thanks,
-BS

Does the Ray node started in the other node have the Java environment setup property? A Ray node requires two conditions to run Java workers:

  1. The java command is in PATH when invoking ray start ....
  2. The ray Python package is installed via ray[java]. Note that Java libs are not installed by default. So pip install ray is not enough.

Another reason might be that ray prefer to schedule tasks locally, instead of sending them to remote node.
You can add .setResource("CPU", 3.0) after Ray.task(MyRayApp::slowFunction) and see what would happen.

Thanks ksfstorm. I didn’t know there’s a java extra for ray. However, when I tried ‘pip install ray[java]’, I got below msg;

(env2) root@risk0:~# pip install ray[java]
Looking in indexes: https://pypi.internal-mirrors.ucloud.cn/simple
Requirement already satisfied: ray[java] in ./anaconda3/envs/env2/lib/python3.7/site-packages (1.13.0)
WARNING: ray 1.13.0 does not provide the extra 'java'

Am I doing something wrong?

Maybe my information is out dated. Could you check the logs on both machine to see if there’s any suspicious errors? They should be in /tmp/ray/session_latest/logs.