Ray is installed on kubernetes with 1CPU and 5GB RAM per worker and the head.
My expectation was somehow that Ray would not only try to balance work with respect to CPU but also memory usage w.r.t. available RAM. When spawning a lot of tasks i get good CPU usage on all workers but RAM is in the range of 300MB out of 5GB while the head is growing fuller and fuller and finally drops the connection.
Is this likely a configuration issue or a known problem? (I generate workload on the cluster from a jupyter notebook running on my local machine i.e. outside of the cluster)
Any ideas or hints on how to fix or investigate this are very welcome!
Additional details:
- first cell of the jupyter notebook running locally on my laptop
import ray
LOCAL_PORT = 10001
ray.client(f"127.0.0.1:{LOCAL_PORT}").connect()
# There is a kubectl portforward running for port 10001 from my laptop to the head node
- typical workload
import pandas as pd
df = pd.from_parquet("a_15_mb_file") # global variable that does not change
mapping_df = ... # a global variable much smaller than df that maps index items of df to a few (~0-30) other index items
def local_functon(a, b, df):
set_a = set(df.loc[a, "some_column_containing_a_list"] )
set_b = set(df.loc[b, "some_column_containing_a_list"] )
return len(set_a.intersection(set(b)))
@ray.remote
def remote_function(item):
other_items = mapping_df.loc[item, "other"]
results = []
for other in other_items:
results.append({other: local_function(item, other)}
return results
refs = [remote_function.remote(item) for item in df.index[:2000]] # for testing purposes i run it only with a few thousand at a time
# first execution is slow due to upload of global vars. afterwards its fast.
ray.get(refs)
# every time i repeat the calls ram usage is increasing a bit. every time i change any function ram usage is increasing a lot. like the old objects don'T get garbage collected.
refs = [remote_function.remote(item) for item in df.index[:2000]]
ray.get(refs)
refs = [remote_function.remote(item) for item in df.index[:2000]]
ray.get(refs)
...
refs = [remote_function.remote(item) for item in df.index[:2000]]
ray.get(refs)
# Client looses connection when head RAM is full (5GB) in the meanwhile more and more wokers spawned (lets say 6 of them) each of them has only few hundred MB of RAM in use