Cannot use Ray with a Spark DF inside dict


Our application currently uses dictionaries to store information regarding the reports we need to run. One of the key-value pairs carries a Pandas DataFrame. Originally, the DF is a spark DF that is converted to Pandas.

I am now experimenting to use the code with the Spark DF instead as we are having Memory issues using Pandas, however, when it comes to the function that uses Ray it fails with “TypeError: cannot pickle ‘_thread.RLock’ object”. From what I read it is related to serialization. I am just a bit confused on why it works when using Pandas and fails using a Spark DF.

Here is a dummy example of what I am trying to do. Our current code would then do a LOT of preprocessing and calculations within that function:

import ray
import sys
sys.stdout.fileno = lambda: False

def preprocess_df(query_dict):
  #preprocess the data
  return query_dict

df1 = spark.createDataFrame(
    [(1, "foo"),(2, "bar")],
    ["id", "label"])
df2 = spark.createDataFrame(
    [(1, "foo1"),(2, "bar2")],
    ["id", "label"])


df1_dict = {}
df1_dict['df'] = df1#.toPandas()
df1_dict['tp'] = 'Yearly'
df1_dict['pd'] = '02/21/2023'

df2_dict = {}
df2_dict['df'] = df2#.toPandas()
df2_dict['tp'] = 'Yearly'
df2_dict['pd'] = '02/21/2023'

queries = [df1_dict,df2_dict]

ray.get([preprocess_df.remote(query) for query in queries])


Any help or advice would be greatly appreciated.

Thank you.