Ray fails to pickle preprocess function

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I tried to do this type of preprocessing for my dataset:

def split_s3_path(s3_path):
    path_parts = s3_path.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    return bucket, key


def download_url(s3_url):
    """
    Download from the s3_url and return the image.
    """
    bucket_name, path = split_s3_path(s3_url)
    print(f"Downloading from {bucket_name} {path}")
    bucket = s3_client.Bucket(bucket_name)
    s3_obj = bucket.Object(path)
    file_stream = io.BytesIO()
    s3_obj.download_fileobj(file_stream)
    return Image.open(file_stream)


def preprocess(row):
    img = download_url(row["image_path"])
    print("Downloaded!")
    return {
        'img': np.asarray(img)
    }

transformed_ds = ds.map(preprocess)

However, it failed with the following error: Can't pickle <built-in function input>: it's not the same object as builtins.input.

Looks like it fails to pickle the preprocess function. Is there a way to turn it off?

The full traceback is here:

---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
/tmp/ipykernel_20143/753781901.py in <cell line: 1>()
----> 1 transformed_ds.take(2)

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset.py in take(self, limit)
   2249         """
   2250         output = []
-> 2251         for row in self.iter_rows():
   2252             output.append(row)
   2253             if len(output) >= limit:

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset_iterator.py in iter_rows(self, prefetch_blocks)
    230             iter_batch_args["prefetch_batches"] = prefetch_blocks
    231 
--> 232         for batch in self.iter_batches(**iter_batch_args):
    233             batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
    234             for row in batch.iter_rows():

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset_iterator.py in iter_batches(self, prefetch_batches, batch_size, batch_format, drop_last, local_shuffle_buffer_size, local_shuffle_seed, _collate_fn, prefetch_blocks)
    163         time_start = time.perf_counter()
    164 
--> 165         block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
    166         if use_legacy:
    167             # Legacy iter_batches does not use metadata.

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py in _to_block_iterator(self)
     29     ]:
     30         ds = self._base_dataset
---> 31         block_iterator, stats, executor = ds._plan.execute_to_iterator()
     32         ds._current_executor = executor
     33         return block_iterator, stats, False

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/plan.py in execute_to_iterator(self, allow_clear_input_blocks, force_read)
    528         gen = iter(block_iter)
    529         try:
--> 530             block_iter = itertools.chain([next(gen)], gen)
    531         except StopIteration:
    532             pass

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py in execute_to_legacy_block_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid)
     44 ) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
     45     """Same as execute_to_legacy_bundle_iterator but returning blocks and metadata."""
---> 46     bundle_iter = execute_to_legacy_bundle_iterator(
     47         executor, plan, allow_clear_input_blocks, dataset_uuid
     48     )

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py in execute_to_legacy_bundle_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid, dag_rewrite)
     82         dag = dag_rewrite(dag)
     83 
---> 84     bundle_iter = executor.execute(dag, initial_stats=stats)
     85     return bundle_iter
     86 

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py in execute(self, dag, initial_stats)
     86         # Setup the streaming DAG topology and start the runner thread.
     87         _validate_dag(dag, self._get_or_refresh_resource_limits())
---> 88         self._topology, _ = build_streaming_topology(dag, self._options)
     89 
     90         if not isinstance(dag, InputDataBuffer):

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py in build_streaming_topology(dag, options)
    277         return op_state
    278 
--> 279     setup_state(dag)
    280 
    281     # Create the progress bars starting from the first operator to run.

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py in setup_state(op)
    274         op_state = OpState(op, inqueues)
    275         topology[op] = op_state
--> 276         op.start(options)
    277         return op_state
    278 

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py in start(self, options)
    163         # Put the function def in the object store to avoid repeated serialization
    164         # in case it's large (i.e., closure captures large objects).
--> 165         self._transform_fn_ref = ray.put(self._transform_fn)
    166 
    167     def add_input(self, refs: RefBundle, input_index: int):

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
    103             if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104                 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105         return func(*args, **kwargs)
    106 
    107     return wrapper

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/worker.py in put(value, _owner)
   2591     with profiling.profile("ray.put"):
   2592         try:
-> 2593             object_ref = worker.put_object(value, owner_address=serialize_owner_address)
   2594         except ObjectStoreFullError:
   2595             logger.info(

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/worker.py in put_object(self, value, object_ref, owner_address)
    674 
    675         try:
--> 676             serialized_value = self.get_serialization_context().serialize(value)
    677         except TypeError as e:
    678             sio = io.StringIO()

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in serialize(self, value)
    464             return RawSerializedObject(value)
    465         else:
--> 466             return self._serialize_to_msgpack(value)

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_msgpack(self, value)
    442         if python_objects:
    443             metadata = ray_constants.OBJECT_METADATA_TYPE_PYTHON
--> 444             pickle5_serialized_object = self._serialize_to_pickle5(
    445                 metadata, python_objects
    446             )

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_pickle5(self, metadata, value)
    404         except Exception as e:
    405             self.get_and_clear_contained_object_refs()
--> 406             raise e
    407         finally:
    408             self.set_out_of_band_serialization()

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_pickle5(self, metadata, value)
    399         try:
    400             self.set_in_band_serialization()
--> 401             inband = pickle.dumps(
    402                 value, protocol=5, buffer_callback=writer.buffer_callback
    403             )

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     86         with io.BytesIO() as file:
     87             cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback)
---> 88             cp.dump(obj)
     89             return file.getvalue()
     90 

/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    731     def dump(self, obj):
    732         try:
--> 733             return Pickler.dump(self, obj)
    734         except RuntimeError as e:
    735             if "recursion" in e.args[0]:

Hi @Peter_Qian, did you have any code for input? It looks like somewhere input get called, and it cannot be pickled. From the code you provide, I cannot find the issue. Looks like you are reading the images from S3. Any reason why you cannot use Ray Data read_images API directly?

ds = ray.data.read_images(paths)