There is this excellent blog post Anyscale - Serverless Kafka Stream Processing with Ray on running Kafka with Ray, which at the end references ray/streaming at master · ray-project/ray · GitHub library. Are there any examples on integration this library with kafka?
The issue is that you can’t run StreamingContext.Builder().build() inside kafka receiver actor, and if I run it outside, then its not clear, how to specify
ctx.source(getmessages())
for the case when I have many listener’s actors. Do I create a context per listener? Can I merge them together at the end?
Did a bit more digging and seeing that StreamingContext.Builder().build()
fails because of the error:
ValueError: Cross language feature needs --load-code-from-local to be set.
Quick search suggest setting
ray.init(_load_code_from_local=True)
But my version
__commit__ = "ee291ae762325f8e70bc3fbd5652b563744d22b2"
__version__ = "2.0.0.dev0"
Does not seem to support it
I finally found a solution here - ray/wordcount.py at master · ray-project/ray · GitHub. It fixed
ValueError: Cross language feature needs --load-code-from-local to be set.
Unfortunately the sample itself does not work. The error I am getting when running it is:
2021-08-28 19:40:54,055 INFO services.py:1265 -- View the Ray dashboard at http://127.0.0.1:8265
(raylet) WARNING: An illegal reflective access operation has occurred
(raylet) WARNING: Illegal reflective access by org.nustaq.serialization.FSTClazzInfo (file:/usr/local/lib/python3.7/site-packages/ray/jars/ray_dist.jar) to field java.lang.String.value
(raylet) WARNING: Please consider reporting this to the maintainers of org.nustaq.serialization.FSTClazzInfo
(raylet) WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
(raylet) WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
File "/Users/boris/Projects/scalingpythonml/streaming/ray_streaming/wordcount.py ", line 73, in <module>
.option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 48, in build
ctx = StreamingContext()
File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 54, in __init__
self._j_ctx = self._gateway_client.create_streaming_context()
File "/usr/local/lib/python3.7/site-packages/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
return deserialize(ray.get(call))
File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 89, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/ray/worker.py", line 1631, in get
raise value
ray.exceptions.CrossLanguageError: An exception raised from JAVA:
io.ray.runtime.exception.RayActorException: (pid=5454, ip=192.168.0.2) The actor died because of it's creation task failed
at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:216)
at io.ray.runtime.RayNativeRuntime.nativeRunTaskExecutor(Native Method)
at io.ray.runtime.RayNativeRuntime.run(RayNativeRuntime.java:225)
at io.ray.runtime.runner.worker.DefaultWorker.main(DefaultWorker.java:15)
Caused by: java.lang.RuntimeException: Failed to load functions from class io.ray.streaming.runtime.python.PythonGateway
at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:240)
at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.getFunction(FunctionManager.java:173)
at io.ray.runtime.functionmanager.FunctionManager.getFunction(FunctionManager.java:108)
at io.ray.runtime.task.TaskExecutor.getRayFunction(TaskExecutor.java:69)
at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:121)
... 3 more
Caused by: java.lang.ClassNotFoundException: io.ray.streaming.runtime.python.PythonGateway
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:435)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:468)
at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:199)
... 7 more
More investigations here. It looks like usage of
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
breaks any application that is using actors. Is it a bug?