Dataset and task compute pipelining

I was wondering how I could use dataset pipelining and task compute pipelining.

I currently have two tasks (both utilizing one cpu resource) and I want both of them to run on specific nodes without explicitly pinning them.

For eg. I would want my dataset pipeline to feed task1 on node 1 and then results of actor1 to feed task2 on node 2

I have used the SPREAD scheduling strategy and a placement group with two bundles (each having one cpu resource) but if I run the tasks in a loop then I sometimes see task1 on node 2 and task2 on node 1

I was wondering how I could get the scheduler to do this without using NodeAffinitySchedulingStrategy

cc @jjyao @Clark_Zinzow can you guys address this question?

Just wanted to follow up in order to request support for this

@abhullar If I understand you correctly, I think what you did/saw is correct/expected

  • Without pinning tasks to nodes, using SPREAD should make scheduler spread the tasks across all the two nodes you provisioned. There is no guarantee that task1 will always on node1 and task2s on node2.
  • If you do want to have this assignment, you may use NodeAffinitySchedulingStrategy and set soft to false. In this case the assignment is rigid and less tolerant to node failures.

Is it possible to have multiple tasks where some are using SPREAD and some use NodeAffinitySchedulingStrategy?

Im not able to import NodeAffinitySchedulingStrategy, I installed ray using

pip install -U "ray[default]"
# installed version is ray, version 1.12.1

I don’t see NodeAffinitySchedulingStrategy in ray/util/

It is introduced in this PR: Node affinity scheduling strategy (#23381) · ray-project/ray@95714cc · GitHub, so not in 1.12 yet. Can you try master branch? If you are waiting for it, the good news is that the 1.13 release is coming soon which will have it.

1 Like

@abhullar Yes, you can submit those tasks as two batches, something like:

def my_func(int idx):

# using SPREAD in first batch
[my_func.option(scheduling_strategy="SPREAD").remote(i) for i in range(10)]

# using node affinity in second batch
# The destination_node_id can be found by calling ray.nodes()
[my_func.option(scheduling_strategy=NodeAffinitySchedulingStrategy(destination_node_id, soft = False)).remote(i) for i in range(5)]