How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have the following script that reads some data from S3, batch processes it, and then uploads it back to a new S3 bucket.
items = ray.data.read_datasource(
# custom data source
TarDatasource(extra_tar_flags="--strip-components 2", profile=True),
paths=S3_SOURCE_URL,
filesystem=arrow_fs.S3FileSystem(...),
include_paths=True,
)
total_items = items.count()
items = items.repartition(ceil(total_items / ITEMS_PER_SHARD))
shards = items.split(total_items // ITEMS_PER_SHARD, equal=False)
@ray.remote
def to_parquet(shard: Dataset, shard_idx: int):
shard = shard.map_batches(processing_function)
shard = shard.repartition(1)
shard.write_parquet(
path=S3_DESTINATION_URL,
fs=arrow_fs.S3FileSystem(...)
ray.get([to_parquet.remote(shard, idx) for idx, shard in enumerate(shards)])
But I’m running into the following issue:
e[2me[36m(_do_write pid=479888)e[0m 2022-10-07 10:18:00,830 ERROR serialization.py:354 -- S3 subsystem not initialized; please call InitializeS3() before carrying out any S3-related operation
e[2me[36m(_do_write pid=479888)e[0m Traceback (most recent call last):
e[2me[36m(_do_write pid=479888)e[0m File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
e[2me[36m(_do_write pid=479888)e[0m obj = self._deserialize_object(data, metadata, object_ref)
e[2me[36m(_do_write pid=479888)e[0m File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 241, in _deserialize_object
e[2me[36m(_do_write pid=479888)e[0m return self._deserialize_msgpack_data(data, metadata_fields)
e[2me[36m(_do_write pid=479888)e[0m File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 196, in _deserialize_msgpack_data
e[2me[36m(_do_write pid=479888)e[0m python_objects = self._deserialize_pickle5_data(pickle5_data)
e[2me[36m(_do_write pid=479888)e[0m File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 186, in _deserialize_pickle5_data
e[2me[36m(_do_write pid=479888)e[0m obj = pickle.loads(in_band)
e[2me[36m(_do_write pid=479888)e[0m File "pyarrow/_s3fs.pyx", line 237, in pyarrow._s3fs.S3FileSystem._reconstruct
e[2me[36m(_do_write pid=479888)e[0m File "pyarrow/_s3fs.pyx", line 227, in pyarrow._s3fs.S3FileSystem.__init__
e[2me[36m(_do_write pid=479888)e[0m File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
e[2me[36m(_do_write pid=479888)e[0m File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
e[2me[36m(_do_write pid=479888)e[0m pyarrow.lib.ArrowInvalid: S3 subsystem not initialized; please call InitializeS3() before carrying out any S3-related operation
I’m not really sure what’s going on here. My understanding of how remote functions actually work is pretty limited. I.e, I’m not sure what is being unpickled here.