Ok, if it’s just a single CSV file, then this is what happens:
- The dataset created will just have one block (as you prints showed)
- This means when you do
ds.write_csv()
, there will be just one task launched to perform the writing
- That single task then writes just to one node (a single Ray task doesn’t write to different nodes)
I ran your script for a single CSV file from S3.
This is the actor view, and they are indeed spread:
Looking at the object store (you can run ray memory
command to get it from your cluster), it shows all blocks (including the splitted blocks) are on the same machine (per IP):
======== Object references status: 2022-12-15 15:25:38.333708 ========
Grouping by node address... Sorting by object size... Display allentries per group...
--- Summary for node address: 172.31.231.95 ---
Mem Used by Objects Local References Pinned Used by task Captured in Objects Actor Handles
0.0 B 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 1, (0.0 B)
--- Object references for node address: 172.31.231.95 ---
IP Address PID Type Call Site Status Size Reference Type Object Ref
172.31.231.95 1616 Worker disabled - ? ACTOR_HANDLE ffffffffffffffff3bdd45f63c2bdd38dc11aa300a00000001000000
--- Summary for node address: 172.31.209.129 ---
Mem Used by Objects Local References Pinned Used by task Captured in Objects Actor Handles
0.0 B 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 1, (0.0 B)
--- Object references for node address: 172.31.209.129 ---
IP Address PID Type Call Site Status Size Reference Type Object Ref
172.31.209.129 1617 Worker disabled - ? ACTOR_HANDLE ffffffffffffffffacdbbf1acfbb0496bad00de90a00000001000000
--- Summary for node address: 172.31.210.116 ---
Mem Used by Objects Local References Pinned Used by task Captured in Objects Actor Handles
15677.0 B 4, (15677.0 B) 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 4, (0.0 B)
--- Object references for node address: 172.31.210.116 ---
IP Address PID Type Call Site Status Size Reference Type Object Ref
172.31.210.116 1128 Worker disabled - ? ACTOR_HANDLE ffffffffffffffff6f5cd0055ed2332d55413d6f0200000001000000
172.31.210.116 11039 Driver disabled FINISHED ? ACTOR_HANDLE ffffffffffffffffacdbbf1acfbb0496bad00de90a00000001000000
172.31.210.116 11039 Driver disabled FINISHED ? ACTOR_HANDLE ffffffffffffffff3bdd45f63c2bdd38dc11aa300a00000001000000
172.31.210.116 11039 Driver disabled - ? ACTOR_HANDLE ffffffffffffffff6f5cd0055ed2332d55413d6f0200000001000000
172.31.210.116 11039 Driver disabled FINISHED 881.0 B LOCAL_REFERENCE d85d4408d609a036ffffffffffffffffffffffff0a00000002000000
172.31.210.116 11039 Driver disabled FINISHED 3932.0 B LOCAL_REFERENCE 5573f74645ff0eb8ffffffffffffffffffffffff0a00000002000000
172.31.210.116 11039 Driver disabled FINISHED 3932.0 B LOCAL_REFERENCE 5573f74645ff0eb8ffffffffffffffffffffffff0a00000003000000
172.31.210.116 11039 Driver disabled FINISHED 6932.0 B LOCAL_REFERENCE d85d4408d609a036ffffffffffffffffffffffff0a00000001000000
To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1
--- Aggregate object store stats across all nodes ---
Plasma memory usage 0 MiB, 0 objects, 0.0% full, 0.0% needed
Lastly the ds.write_csv()
also written to corresponding local node.
Here is the code (based on your code):
import ray
from ray.data.datasource.file_based_datasource import BlockWritePathProvider
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.placement_group import placement_group
import os
from pyarrow import fs
FILE_PATH = "s3://anonymous@air-example-data/iris.csv"
@ray.remote
class Worker:
def __init__(self, i):
self.path = f'local:///tmp/'
def save(self, shard):
print(shard)
import socket
hostname = socket.gethostname()
IPAddr = socket.gethostbyname(hostname)
print("deb ip:", IPAddr)
path = self.path + "/" + IPAddr
shard.write_csv(path, filesystem=fs.LocalFileSystem())
num_workers = 2
pg = placement_group(
[{"CPU": 1}] * num_workers,
strategy="STRICT_SPREAD",
)
ray.get(pg.ready())
workers_options = [Worker.options(num_cpus=1, scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg
)) for i in range(num_workers)]
ds = ray.data.read_csv(paths=FILE_PATH).fully_executed()
workers = [workers_options[i].remote(i) for i in range(num_workers)]
shards = ds.split(num_workers, equal=True)
print(shards)
ray.get([workers[i].save.remote(shards[i]) for i in range(num_workers)])