Write_csv saving data on the same node


import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.placement_group import placement_group

import os

@ray.remote
class Worker:
    def __init__(self, i):
        self.path = f'/home/user'
        
        
    def save(self, shard):
        print(shard)
        if not os.path.exists(self.path):
            os.mkdir(self.path)
        shard.write_csv(self.path, ray_remote_args=self)
        

num_workers = 2
pg = placement_group(
        [{"CPU": 1}] * num_workers,
        strategy="STRICT_SPREAD",
    )
ray.get(pg.ready())
workers_options = [Worker.options(num_cpus=1, scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=pg
            )) for i in range(num_workers)]
ds = ray.data.read_csv(paths=DATA, filesystem=FILE_SYSTEM)
workers = [workers_options[i].remote(i) for i in range(num_workers)]
shards = ds.split(num_workers, equal=True, locality_hints=workers)
print(shards)
ray.get([workers[i].save.remote(shards[i]) for i in range(num_workers)])

I was trying write_csv function from ray.data to dump CSV data among k different nodes after fetching it from the remote file-system. But after calling it, both the shards are saved on the same node, even though the actors who are calling write_csv are on different nodes.

Is write_csv independent to which node it is called, and write data based on locality?

Hi @pratkpranav Were you running this across different physical nodes (i.e. machines)? If you ran this script on the same machine with different logical nodes, the write_csv is going to access the same filesystem.

Hi @jianxiao, I was running it across different physical machines.

Ok. So write_csv() is not tied to the node where it’s called. Under the hood, it may launch a bunch of more remote tasks to complete the writing. Where those tasks may run depends on scheduling, which could be controlled via the ray_remote_args parameter.

To ensure all write tasks are happening locally, users will need to use local URI scheme which as introduced recently, e.g. by using ds.write_csv("local:///home/user") it’ll ensure the data is written to /home/user directory locally.

Hi @jianxiao,
I tried doing this

import ray
from ray.data.datasource.file_based_datasource import BlockWritePathProvider
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.placement_group import placement_group

import os

from pyarrow import fs

@ray.remote
class Worker:
    def __init__(self, i):
        self.path = f'local:///home/pratik'
        
        
    def save(self, shard):
        print(shard)
        shard.write_csv(self.path, filesystem=fs.LocalFileSystem())
        

num_workers = 2
pg = placement_group(
        [{"CPU": 1}] * num_workers,
        strategy="STRICT_SPREAD",
    )
ray.get(pg.ready())
workers_options = [Worker.options(num_cpus=1, scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=pg
            )) for i in range(num_workers)]
ds = ray.data.read_csv(paths='local:///share/pratik/RayExperiments/RayData/Train.csv', filesystem=fs.LocalFileSystem())
workers = [workers_options[i].remote(i) for i in range(num_workers)]
shards = ds.split(num_workers, equal=True, locality_hints=workers)
print(shards)
ray.get([workers[i].save.remote(shards[i]) for i in range(num_workers)])

It’s still saving both the splitted files on the same node.

You are using read_csv with local URI. This will make the read tasks run locally; and then you split them with local_hints, so splits are also on local node; then you write them with local URI so they get written out to the same local node. You may try drop the local URI and read from remote filesystem for read_csv.

This is weird. The following scripts with num_cpus=1 is still not working. However, if I increase the num_cpus to 25(each node have 48 cpus). Then both the nodes start to save files rather than all the files being saved on one node.

‘STRICT_SPREAD’ should make sure that even if num_cpus=1, both the workers are initializing on different machines.

import ray
from ray.data.datasource.file_based_datasource import BlockWritePathProvider
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.placement_group import placement_group

import os

from pyarrow import fs

@ray.remote
class Worker:
    def __init__(self, i):
        self.path = f'local:///home/user/'
        
        
    def save(self, shard):
        print(shard)
        shard.write_csv(self.path, filesystem=fs.LocalFileSystem())
        

num_workers = 2
pg = placement_group(
        [{"CPU": 1}] * num_workers,
        strategy="STRICT_SPREAD",
    )
ray.get(pg.ready())
workers_options = [Worker.options(num_cpus=1, scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=pg
            )) for i in range(num_workers)]
ds = ray.data.read_csv(paths=FILE_PATH, filesystem=fs.S3FileSystem(access_key=ACCESS_KEY, secret_key=SECRET_KEY,region=REGION))
workers = [workers_options[i].remote(i) for i in range(num_workers)]
shards = ds.split(num_workers, equal=True)
print(shards)
ray.get([workers[i].save.remote(shards[i]) for i in range(num_workers)])

Do you mean setting you are setting num_cpus=25 in Worker.options(num_cpus=25, scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg) and then it will work? Note the placement group has only 1 CPU, so this should not even be able to schedule the workers.

Ohh! I did change the “CPU” in PlacementGroup to 25 too.

Can you verify that he worker actors are indeed spread across nodes? You may check it via looking at Ray Dashboard’s actor view (when the job is running – you may time.sleep() it if after the workers = [] if it completes too first): Ray Dashboard — Ray 2.1.0

Also if you want me to reproduce, do you have the concrete FILE_PATH that I can use?
If not, it’s just one CSV file? Anything special about it to repro?

Even after adding time, somehow I am not able to see dashboard(maybe port forwarding issue. Don’t know.). However, I printed the shards after being passed to workers, and these are the output. The prints are form different IPs:

(Worker pid=544640) Dataset(num_blocks=1, num_rows=1799999, schema={2: int64, Stuning even for the non-gamer: string, This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^: string})
(Worker pid=177356, ip=192.168.1.10) Dataset(num_blocks=1, num_rows=1799999, schema={2: int64, Stuning even for the non-gamer: string, This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^: string})

It’s just one CSV File, nothing special about that. I tried it with different CSVs too, with no success.

Ok, if it’s just a single CSV file, then this is what happens:

  • The dataset created will just have one block (as you prints showed)
  • This means when you do ds.write_csv(), there will be just one task launched to perform the writing
  • That single task then writes just to one node (a single Ray task doesn’t write to different nodes)

I ran your script for a single CSV file from S3.

This is the actor view, and they are indeed spread:

Looking at the object store (you can run ray memory command to get it from your cluster), it shows all blocks (including the splitted blocks) are on the same machine (per IP):

======== Object references status: 2022-12-15 15:25:38.333708 ========
Grouping by node address...        Sorting by object size...        Display allentries per group...


--- Summary for node address: 172.31.231.95 ---
Mem Used by Objects  Local References  Pinned        Used by task   Captured in Objects  Actor Handles
0.0 B                0, (0.0 B)        0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           1, (0.0 B)   

--- Object references for node address: 172.31.231.95 ---
IP Address       PID    Type    Call Site               Status          Size    Reference Type      Object Ref                                              
172.31.231.95    1616   Worker  disabled                -               ?       ACTOR_HANDLE        ffffffffffffffff3bdd45f63c2bdd38dc11aa300a00000001000000

--- Summary for node address: 172.31.209.129 ---
Mem Used by Objects  Local References  Pinned        Used by task   Captured in Objects  Actor Handles
0.0 B                0, (0.0 B)        0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           1, (0.0 B)   

--- Object references for node address: 172.31.209.129 ---
IP Address       PID    Type    Call Site               Status          Size    Reference Type      Object Ref                                              
172.31.209.129   1617   Worker  disabled                -               ?       ACTOR_HANDLE        ffffffffffffffffacdbbf1acfbb0496bad00de90a00000001000000

--- Summary for node address: 172.31.210.116 ---
Mem Used by Objects  Local References  Pinned        Used by task   Captured in Objects  Actor Handles
15677.0 B            4, (15677.0 B)    0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           4, (0.0 B)   

--- Object references for node address: 172.31.210.116 ---
IP Address       PID    Type    Call Site               Status          Size    Reference Type      Object Ref                                              
172.31.210.116   1128   Worker  disabled                -               ?       ACTOR_HANDLE        ffffffffffffffff6f5cd0055ed2332d55413d6f0200000001000000

172.31.210.116   11039  Driver  disabled                FINISHED        ?       ACTOR_HANDLE        ffffffffffffffffacdbbf1acfbb0496bad00de90a00000001000000

172.31.210.116   11039  Driver  disabled                FINISHED        ?       ACTOR_HANDLE        ffffffffffffffff3bdd45f63c2bdd38dc11aa300a00000001000000

172.31.210.116   11039  Driver  disabled                -               ?       ACTOR_HANDLE        ffffffffffffffff6f5cd0055ed2332d55413d6f0200000001000000

172.31.210.116   11039  Driver  disabled                FINISHED        881.0 B  LOCAL_REFERENCE     d85d4408d609a036ffffffffffffffffffffffff0a00000002000000

172.31.210.116   11039  Driver  disabled                FINISHED        3932.0 B  LOCAL_REFERENCE     5573f74645ff0eb8ffffffffffffffffffffffff0a00000002000000

172.31.210.116   11039  Driver  disabled                FINISHED        3932.0 B  LOCAL_REFERENCE     5573f74645ff0eb8ffffffffffffffffffffffff0a00000003000000

172.31.210.116   11039  Driver  disabled                FINISHED        6932.0 B  LOCAL_REFERENCE     d85d4408d609a036ffffffffffffffffffffffff0a00000001000000

To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1

--- Aggregate object store stats across all nodes ---
Plasma memory usage 0 MiB, 0 objects, 0.0% full, 0.0% needed

Lastly the ds.write_csv() also written to corresponding local node.

Here is the code (based on your code):

import ray
from ray.data.datasource.file_based_datasource import BlockWritePathProvider
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.placement_group import placement_group

import os

from pyarrow import fs

FILE_PATH = "s3://anonymous@air-example-data/iris.csv"

@ray.remote
class Worker:
    def __init__(self, i):
        self.path = f'local:///tmp/'
        
    def save(self, shard):
        print(shard)
        import socket
        hostname = socket.gethostname()
        IPAddr = socket.gethostbyname(hostname)
        print("deb ip:", IPAddr)
        path = self.path + "/" + IPAddr
        shard.write_csv(path, filesystem=fs.LocalFileSystem())
        

num_workers = 2
pg = placement_group(
        [{"CPU": 1}] * num_workers,
        strategy="STRICT_SPREAD",
    )
ray.get(pg.ready())
workers_options = [Worker.options(num_cpus=1, scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=pg
            )) for i in range(num_workers)]

ds = ray.data.read_csv(paths=FILE_PATH).fully_executed()

workers = [workers_options[i].remote(i) for i in range(num_workers)]

shards = ds.split(num_workers, equal=True)
print(shards)

ray.get([workers[i].save.remote(shards[i]) for i in range(num_workers)])