Ray streaming with Kafka

There is this excellent blog post Anyscale - Serverless Kafka Stream Processing with Ray on running Kafka with Ray, which at the end references ray/streaming at master · ray-project/ray · GitHub library. Are there any examples on integration this library with kafka?

The issue is that you can’t run StreamingContext.Builder().build() inside kafka receiver actor, and if I run it outside, then its not clear, how to specify

ctx.source(getmessages())

for the case when I have many listener’s actors. Do I create a context per listener? Can I merge them together at the end?

Did a bit more digging and seeing that StreamingContext.Builder().build() fails because of the error:

ValueError: Cross language feature needs --load-code-from-local to be set.

Quick search suggest setting

ray.init(_load_code_from_local=True)

But my version

__commit__ = "ee291ae762325f8e70bc3fbd5652b563744d22b2"
__version__ = "2.0.0.dev0"

Does not seem to support it

I finally found a solution here - ray/wordcount.py at master · ray-project/ray · GitHub. It fixed

ValueError: Cross language feature needs --load-code-from-local to be set.

Unfortunately the sample itself does not work. The error I am getting when running it is:

2021-08-28 19:40:54,055	INFO services.py:1265 -- View the Ray dashboard at http://127.0.0.1:8265
(raylet) WARNING: An illegal reflective access operation has occurred
(raylet) WARNING: Illegal reflective access by org.nustaq.serialization.FSTClazzInfo (file:/usr/local/lib/python3.7/site-packages/ray/jars/ray_dist.jar) to field java.lang.String.value
(raylet) WARNING: Please consider reporting this to the maintainers of org.nustaq.serialization.FSTClazzInfo
(raylet) WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
(raylet) WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
  File "/Users/boris/Projects/scalingpythonml/streaming/ray_streaming/wordcount.py ", line 73, in <module>
    .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 48, in build
    ctx = StreamingContext()
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/context.py", line 54, in __init__
    self._j_ctx = self._gateway_client.create_streaming_context()
  File "/usr/local/lib/python3.7/site-packages/ray/streaming/runtime/gateway_client.py", line 21, in create_streaming_context
    return deserialize(ray.get(call))
  File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 89, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray/worker.py", line 1631, in get
    raise value
ray.exceptions.CrossLanguageError: An exception raised from JAVA:
io.ray.runtime.exception.RayActorException: (pid=5454, ip=192.168.0.2) The actor died because of it's creation task failed
	at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:216)
	at io.ray.runtime.RayNativeRuntime.nativeRunTaskExecutor(Native Method)
	at io.ray.runtime.RayNativeRuntime.run(RayNativeRuntime.java:225)
	at io.ray.runtime.runner.worker.DefaultWorker.main(DefaultWorker.java:15)
Caused by: java.lang.RuntimeException: Failed to load functions from class io.ray.streaming.runtime.python.PythonGateway
	at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:240)
	at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.getFunction(FunctionManager.java:173)
	at io.ray.runtime.functionmanager.FunctionManager.getFunction(FunctionManager.java:108)
	at io.ray.runtime.task.TaskExecutor.getRayFunction(TaskExecutor.java:69)
	at io.ray.runtime.task.TaskExecutor.execute(TaskExecutor.java:121)
	... 3 more
Caused by: java.lang.ClassNotFoundException: io.ray.streaming.runtime.python.PythonGateway
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:435)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:468)
	at io.ray.runtime.functionmanager.FunctionManager$JobFunctionTable.loadFunctionsForClass(FunctionManager.java:199)
	... 7 more

More investigations here. It looks like usage of

    ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))

breaks any application that is using actors. Is it a bug?