Cannot use S3 inside of task?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I have the following script that reads some data from S3, batch processes it, and then uploads it back to a new S3 bucket.

items = ray.data.read_datasource(
    # custom data source
    TarDatasource(extra_tar_flags="--strip-components 2", profile=True),
    paths=S3_SOURCE_URL,
    filesystem=arrow_fs.S3FileSystem(...),
    include_paths=True,
)


total_items = items.count()
items = items.repartition(ceil(total_items / ITEMS_PER_SHARD))
shards = items.split(total_items // ITEMS_PER_SHARD, equal=False)

@ray.remote
def to_parquet(shard: Dataset, shard_idx: int):
    shard = shard.map_batches(processing_function)
    shard = shard.repartition(1)
    shard.write_parquet(
        path=S3_DESTINATION_URL,
        fs=arrow_fs.S3FileSystem(...)

ray.get([to_parquet.remote(shard, idx) for idx, shard in enumerate(shards)])

But I’m running into the following issue:

e[2me[36m(_do_write pid=479888)e[0m 2022-10-07 10:18:00,830	ERROR serialization.py:354 -- S3 subsystem not initialized; please call InitializeS3() before carrying out any S3-related operation
e[2me[36m(_do_write pid=479888)e[0m Traceback (most recent call last):
e[2me[36m(_do_write pid=479888)e[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
e[2me[36m(_do_write pid=479888)e[0m     obj = self._deserialize_object(data, metadata, object_ref)
e[2me[36m(_do_write pid=479888)e[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 241, in _deserialize_object
e[2me[36m(_do_write pid=479888)e[0m     return self._deserialize_msgpack_data(data, metadata_fields)
e[2me[36m(_do_write pid=479888)e[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 196, in _deserialize_msgpack_data
e[2me[36m(_do_write pid=479888)e[0m     python_objects = self._deserialize_pickle5_data(pickle5_data)
e[2me[36m(_do_write pid=479888)e[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/serialization.py", line 186, in _deserialize_pickle5_data
e[2me[36m(_do_write pid=479888)e[0m     obj = pickle.loads(in_band)
e[2me[36m(_do_write pid=479888)e[0m   File "pyarrow/_s3fs.pyx", line 237, in pyarrow._s3fs.S3FileSystem._reconstruct
e[2me[36m(_do_write pid=479888)e[0m   File "pyarrow/_s3fs.pyx", line 227, in pyarrow._s3fs.S3FileSystem.__init__
e[2me[36m(_do_write pid=479888)e[0m   File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
e[2me[36m(_do_write pid=479888)e[0m   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
e[2me[36m(_do_write pid=479888)e[0m pyarrow.lib.ArrowInvalid: S3 subsystem not initialized; please call InitializeS3() before carrying out any S3-related operation

I’m not really sure what’s going on here. My understanding of how remote functions actually work is pretty limited. I.e, I’m not sure what is being unpickled here.

I figured out the issue, the right kwarg is filesystem not fs.
Specifically, it looks like the write_parquet is a remote write, so when I use the incorrect kwarg, it does not get passed through the S3 Filesystem Serialization workaround, which causes the error in question.

1 Like

Hi @Vedant_Roy, glad you were able to figure this out. As a side question, is there a particular reason why you sharded your data and used tasks to execute the parquet writes, rather than just doing so directly on the dataset?

Would something like this work for your use case?

items = ray.data.read_datasource(
    # custom data source
    TarDatasource(extra_tar_flags="--strip-components 2", profile=True),
    paths=S3_SOURCE_URL,
    filesystem=arrow_fs.S3FileSystem(...),
    include_paths=True,
)

total_items = items.count()
items = items.repartition(ceil(total_items / ITEMS_PER_SHARD))
items = items.map_batches(processing_function)
items.write_parquet(
        path=S3_DESTINATION_URL,
        filesystem=arrow_fs.S3FileSystem(...)

@matthewdeng

Thanks! I didn’t realize transformations were distributed across a cluster. For some reason, this seems to read the files twice. See my logs:

(_execute_read_task pid=8732) event=get_tar path=data-small/0.tar files=10000 status=read_end level=info ray_node_id=83d40e17e89aaac211dc3507da89afd8d6b2a96c0afc89bfabc2378d timestamp=1665878065.9118118
(_execute_read_task pid=8732) event=get_tar path=data-small/1.tar files=10000 status=read_end level=info ray_node_id=83d40e17e89aaac211dc3507da89afd8d6b2a96c0afc89bfabc2378d timestamp=1665878088.6969883
(_map_block_nosplit pid=8854) event=get_tar path=data-small/0.tar files=10000 status=read_end level=info ray_node_id=83d40e17e89aaac211dc3507da89afd8d6b2a96c0afc89bfabc2378d timestamp=1665878115.2252595
(_map_block_nosplit pid=8885) event=get_tar path=data-small/1.tar files=10000 status=read_end level=info ray_node_id=83d40e17e89aaac211dc3507da89afd8d6b2a96c0afc89bfabc2378d timestamp=1665878120.2300885

More-over, if I have 3 files instead of 2, each file will be read 3 times and so on.

Edit: I can fix this by running

items = ray.data.read_datasource(
    TarDatasource(extra_tar_flags="--strip-components 2"),
    paths=S3_SOURCE_URL,
    filesystem=from_fs,
    include_paths=True,
).fully_executed()

before any transformations, but I don’t have good intuition for why this works.

Oh, another problem with the solution that doesn’t explictly spawn tasks is that my cluster doesn’t autoscale up. This is not a huge problem, since I can just set min_workers, but the other solution does auto-scale.

Hi @Vedant_Roy The files are read twice because:

  • The total_items = items.count() call, which will have to execute the read tasks to compute the total number of record
  • And then actually processing after the .count() call.

What you did with .fully_executed() is correct. This will read all files and pin them in the cluster for any later use and avoid re-read those files.

1 Like