Cannot pickle '_thread.lock' object

Hi, Recently came across ray and was playing around with it.
I was trying to read around 500 PDFs from a directory and embed the data and store them in a vector store(Weaviate in this case) and im running into an error. Have attached the entire traceback and the code below.
Code:

from io import BytesIO

import ray
import weaviate
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
from langchain.vectorstores.weaviate import Weaviate
from pypdf import PdfReader

ray.init()
ray.data.DataContext.get_current().execution_options.verbose_progress = True

pdf_path = "/home/sln/VFS/datasets-2020445568-2020445568/data"  # Directory of PDFs

ds = ray.data.read_binary_files(
    pdf_path, include_paths=True, partition_filter=ray.data.datasource.FileExtensionFilter("pdf")
)

WEAVIATE_URL = "http://127.0.0.1:8080"
wclient = weaviate.Client(url="http://localhost:7080")

embedding = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-mpnet-base-v2",
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": False},
)

weaviatedb = Weaviate(
    client=wclient,
    by_text=False,
    index_name="Testing",
    text_key="test",
    embedding=embedding,
)

def split_text(page_text: str):
    pdf_file_obj = BytesIO(page_text["bytes"])
    pdf_reader = PdfReader(pdf_file_obj)
    document = [
        Document(
            page_content=page.extract_text(),
            metadata={"source": page_text.get("path"), "page": page_number},
        )
        for page_number, page in enumerate(pdf_reader.pages)
    ]

    documents = document.load()
    weaviatedb.add_documents(documents)

    return None

ds = ds.flat_map(split_text)

text_and_embeddings = list(ds.iter_rows())

Error:

 /home/sln/cstack_projects/venv-cstack/bin/python3.8 /home/sln/cstack_projects/genai-stack/rayy.py  
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1160: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/gcs_server.out' mode='a' encoding='utf-8'>
  self.start_gcs_server()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1160: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/gcs_server.err' mode='a' encoding='utf-8'>
  self.start_gcs_server()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1165: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/monitor.out' mode='a' encoding='utf-8'>
  self.start_monitor()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1165: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/monitor.err' mode='a' encoding='utf-8'>
  self.start_monitor()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1181: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/dashboard.err' mode='a' encoding='utf-8'>
  self.start_api_server(
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1223: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/raylet.out' mode='a' encoding='utf-8'>
  self.start_raylet(plasma_directory, object_store_memory)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1223: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/raylet.err' mode='a' encoding='utf-8'>
  self.start_raylet(plasma_directory, object_store_memory)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/node.py:1225: ResourceWarning: unclosed file <_io.TextIOWrapper name='/tmp/ray/session_2023-09-12_18-08-11_687458_93753/logs/log_monitor.err' mode='a' encoding='utf-8'>
  self.start_log_monitor()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2023-09-12 18:08:13,888 INFO worker.py:1612 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
2023-09-12 18:08:20,424 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBinary->FlatMap(split_text)]
2023-09-12 18:08:20,424 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=True)
Traceback (most recent call last):
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/worker.py", line 687, in put_object
    serialized_value = self.get_serialization_context().serialize(value)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/serialization.py", line 468, in serialize
    return self._serialize_to_msgpack(value)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/serialization.py", line 446, in _serialize_to_msgpack
    pickle5_serialized_object = self._serialize_to_pickle5(
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/serialization.py", line 408, in _serialize_to_pickle5
    raise e
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/serialization.py", line 403, in _serialize_to_pickle5
    inband = pickle.dumps(
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 88, in dumps
    cp.dump(obj)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 733, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/sln/cstack_projects/genai-stack/rayy.py", line 59, in <module>
    text_and_embeddings = list(ds.iter_batches())
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/iterator.py", line 159, in iter_batches
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 32, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/plan.py", line 538, in execute_to_iterator
    block_iter = itertools.chain([next(gen)], gen)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py", line 48, in execute_to_legacy_block_iterator
    bundle_iter = execute_to_legacy_bundle_iterator(
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py", line 86, in execute_to_legacy_bundle_iterator
    bundle_iter = executor.execute(dag, initial_stats=stats)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py", line 103, in execute
    self._topology, _ = build_streaming_topology(dag, self._options)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 299, in build_streaming_topology
    setup_state(dag)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in setup_state
    op.start(options)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 179, in start
    self._transform_fn_ref = ray.put(self._transform_fn)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/worker.py", line 2597, in put
    object_ref = worker.put_object(value, owner_address=serialize_owner_address)
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/_private/worker.py", line 696, in put_object
    raise TypeError(msg) from e
TypeError: Could not serialize the put value <function OperatorFusionRule._get_fused_map_operator.<locals>.fused_map_transform_fn at 0x7fb184fe55e0>:
================================================================================
Checking Serializability of <function OperatorFusionRule._get_fused_map_operator.<locals>.fused_map_transform_fn at 0x7fb184fe55e0>
================================================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
Detected 2 nonlocal variables. Checking serializability...
    Serializing 'down_transform_fn' <function _plan_udf_map_op.<locals>.do_map at 0x7fb184fe5550>...
    !!! FAIL serialization: cannot pickle '_thread.lock' object
    Detected 3 nonlocal variables. Checking serializability...
        Serializing 'fn_args' (<function split_text at 0x7fb2557bfee0>,)...
        !!! FAIL serialization: cannot pickle '_thread.lock' object
        WARNING: Did not find non-serializable object in (<function split_text at 0x7fb2557bfee0>,). This may be an oversight.
================================================================================
Variable: 

        FailTuple(fn_args [obj=(<function split_text at 0x7fb2557bfee0>,), parent=<function _plan_udf_map_op.<locals>.do_map at 0x7fb184fe5550>])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable. 
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class. 
================================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================

/usr/lib64/python3.8/subprocess.py:946: ResourceWarning: subprocess 93818 is still running
  _warn("subprocess %s is still running" % self.pid,
ResourceWarning: Enable tracemalloc to get the object allocation traceback
Exception ignored in: <function StreamingExecutor.__del__ at 0x7fb255828160>
Traceback (most recent call last):
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py", line 147, in __del__
    self.shutdown()
  File "/home/sln/cstack_projects/venv-cstack/lib64/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py", line 160, in shutdown
    self.join(timeout=2.0)
  File "/usr/lib64/python3.8/threading.py", line 1006, in join
    raise RuntimeError("cannot join thread before it is started")
RuntimeError: cannot join thread before it is started

Hi @shreehari , thanks for your interest with Ray Data.

It looks like the underlying cause is because the function split_text is not serializable by pickle, (possibly due to the weaviatedb call, if I were to guess). This needs to be serializable in order for Ray to be able to transfer data between nodes; you can read more about serialization here.

Will check it out, thanks for sharing relevant resource @sjl