I have a question, please.
A bit of context: I am working on an ETL like pipeline: I download a bunch of CSV files from a s3 bucket, read them, concat into 2 pandas dataframes - all of that is done with plain multiprocessing for now. For some entries in the first dataframe I want to find the best match from the second one based on certain features - KNN.
How I use ray: I have a function (ray task) that runs KNN. This function gets references to both dataframes and some other args such as what columns to use for matching etc, index etc. Using the index provided, the function can slice, index the first dataframe to get only a single entry it will be searching a match for. Once the match is found, it will return another dataframe with some information such as distance, match id etc.
Some pseudo code:
@ray.remote def run_matching(i, df1, df0, features): to_match = df1.loc[i, :] # Run matching against df0 using features provided # Return matching results + some other related stuff return pd.DataFrame() def main(): # Download CSV # Read CSV, concat into 2 dfs df1 = pd.DataFrame() df0 = pd.DataFrame() # Moves DFs to the object store df1_ref = ray.put(df1) df0_ref = ray.put(df0) total_to_match = df1.shape # Launch tasks futures = [ run_matching.remote(i, df1_ref, df0_ref, ["feature1", "feature2"]) for i in range(total_to_match) ] output = ray.get(futures) # Postprocess output by concating dfs and uploading results if __name__ == "__main__": ray.init() main() ray.shutdown()
Everything seems to be working fine, the DFs get successfully moved to the object store but then everything freezes and I get a single message - Killed. In addition, when I check the processes running on the VM after that there are a bunch of ray processes.
The total number of tasks I might need to run can be ~5-10k (certain combinations in DF1). I tried to run with just 10-20, the same output.
- Could anyone explain what is happening? Am I doing something wrong?
- If I have thousands of tasks and then attempt launching them using list comprehension, does Ray limit the total number of tasks running simultaneously depending on the machine characteristics?
- Should I consider actors instead of tasks considering I have several thousands of them to run? Currently we use multiprocessing pool for this task. I decided to rewrite the thing using Ray to use smaller VM RAM-wise (no need to copy DFs into each process) and potentially run it on a cluster. Each matching run doesnt take a lot of time, there is just a lot of them, so probably having N actors (N = to number of cores?) is a better idea to avoid the overhead for creating a new task for each match?
- How to give ray unlimited resources - can use all cores and all RAM? Or does it assume it can use everything unless stated otherwise?
Thank you in advance!