How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I tried to do this type of preprocessing for my dataset:
def split_s3_path(s3_path):
path_parts = s3_path.replace("s3://", "").split("/")
bucket = path_parts.pop(0)
key = "/".join(path_parts)
return bucket, key
def download_url(s3_url):
"""
Download from the s3_url and return the image.
"""
bucket_name, path = split_s3_path(s3_url)
print(f"Downloading from {bucket_name} {path}")
bucket = s3_client.Bucket(bucket_name)
s3_obj = bucket.Object(path)
file_stream = io.BytesIO()
s3_obj.download_fileobj(file_stream)
return Image.open(file_stream)
def preprocess(row):
img = download_url(row["image_path"])
print("Downloaded!")
return {
'img': np.asarray(img)
}
transformed_ds = ds.map(preprocess)
However, it failed with the following error: Can't pickle <built-in function input>: it's not the same object as builtins.input
.
Looks like it fails to pickle the preprocess function. Is there a way to turn it off?
The full traceback is here:
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
/tmp/ipykernel_20143/753781901.py in <cell line: 1>()
----> 1 transformed_ds.take(2)
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset.py in take(self, limit)
2249 """
2250 output = []
-> 2251 for row in self.iter_rows():
2252 output.append(row)
2253 if len(output) >= limit:
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset_iterator.py in iter_rows(self, prefetch_blocks)
230 iter_batch_args["prefetch_batches"] = prefetch_blocks
231
--> 232 for batch in self.iter_batches(**iter_batch_args):
233 batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
234 for row in batch.iter_rows():
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/dataset_iterator.py in iter_batches(self, prefetch_batches, batch_size, batch_format, drop_last, local_shuffle_buffer_size, local_shuffle_seed, _collate_fn, prefetch_blocks)
163 time_start = time.perf_counter()
164
--> 165 block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
166 if use_legacy:
167 # Legacy iter_batches does not use metadata.
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/dataset_iterator/dataset_iterator_impl.py in _to_block_iterator(self)
29 ]:
30 ds = self._base_dataset
---> 31 block_iterator, stats, executor = ds._plan.execute_to_iterator()
32 ds._current_executor = executor
33 return block_iterator, stats, False
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/plan.py in execute_to_iterator(self, allow_clear_input_blocks, force_read)
528 gen = iter(block_iter)
529 try:
--> 530 block_iter = itertools.chain([next(gen)], gen)
531 except StopIteration:
532 pass
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py in execute_to_legacy_block_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid)
44 ) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
45 """Same as execute_to_legacy_bundle_iterator but returning blocks and metadata."""
---> 46 bundle_iter = execute_to_legacy_bundle_iterator(
47 executor, plan, allow_clear_input_blocks, dataset_uuid
48 )
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py in execute_to_legacy_bundle_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid, dag_rewrite)
82 dag = dag_rewrite(dag)
83
---> 84 bundle_iter = executor.execute(dag, initial_stats=stats)
85 return bundle_iter
86
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py in execute(self, dag, initial_stats)
86 # Setup the streaming DAG topology and start the runner thread.
87 _validate_dag(dag, self._get_or_refresh_resource_limits())
---> 88 self._topology, _ = build_streaming_topology(dag, self._options)
89
90 if not isinstance(dag, InputDataBuffer):
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py in build_streaming_topology(dag, options)
277 return op_state
278
--> 279 setup_state(dag)
280
281 # Create the progress bars starting from the first operator to run.
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py in setup_state(op)
274 op_state = OpState(op, inqueues)
275 topology[op] = op_state
--> 276 op.start(options)
277 return op_state
278
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py in start(self, options)
163 # Put the function def in the object store to avoid repeated serialization
164 # in case it's large (i.e., closure captures large objects).
--> 165 self._transform_fn_ref = ray.put(self._transform_fn)
166
167 def add_input(self, refs: RefBundle, input_index: int):
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
106
107 return wrapper
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/worker.py in put(value, _owner)
2591 with profiling.profile("ray.put"):
2592 try:
-> 2593 object_ref = worker.put_object(value, owner_address=serialize_owner_address)
2594 except ObjectStoreFullError:
2595 logger.info(
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/worker.py in put_object(self, value, object_ref, owner_address)
674
675 try:
--> 676 serialized_value = self.get_serialization_context().serialize(value)
677 except TypeError as e:
678 sio = io.StringIO()
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in serialize(self, value)
464 return RawSerializedObject(value)
465 else:
--> 466 return self._serialize_to_msgpack(value)
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_msgpack(self, value)
442 if python_objects:
443 metadata = ray_constants.OBJECT_METADATA_TYPE_PYTHON
--> 444 pickle5_serialized_object = self._serialize_to_pickle5(
445 metadata, python_objects
446 )
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_pickle5(self, metadata, value)
404 except Exception as e:
405 self.get_and_clear_contained_object_refs()
--> 406 raise e
407 finally:
408 self.set_out_of_band_serialization()
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/_private/serialization.py in _serialize_to_pickle5(self, metadata, value)
399 try:
400 self.set_in_band_serialization()
--> 401 inband = pickle.dumps(
402 value, protocol=5, buffer_callback=writer.buffer_callback
403 )
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
86 with io.BytesIO() as file:
87 cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback)
---> 88 cp.dump(obj)
89 return file.getvalue()
90
/opt/conda/envs/lab42/lib/python3.10/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
731 def dump(self, obj):
732 try:
--> 733 return Pickler.dump(self, obj)
734 except RuntimeError as e:
735 if "recursion" in e.args[0]: