Why is blocking shuffle faster than nonblocking shuffle?

In the spirit of the Exoshuffle paper I have been playing around with shuffles in Ray. The following is my code:

M=8
R=8
N = 2
ips = ["172.31.30.100","172.31.18.113"]

import time

@ray.remote(num_returns=R)
def map(m):
    start = time.time()
    data = torch.randn(1000,10000)
    #data = torch.randn(1000,10000,device=torch.device('cuda'))
    print("random data gen used", time.time() - start)
    interval = 1000 // M
    return [data[i * interval: (i+1) * interval] for i in range(R)]

@ray.remote
def reduce(blocks):
    return torch.cat(ray.get(blocks))

def nb_shuffle():
    map_out = [map.options(num_cpus=0.001,num_gpus=0.001, resources={"node:" + ips[m % N]:0.001}).remote(m) for m in range(M)]
    return [reduce.options(num_cpus=0.001,num_gpus=0.001, resources={"node:" + ips[r % N]:0.001}).remote([z[r] for z in map_out]) for r in range(R)]


def b_shuffle():
    outputs = [map.options(num_cpus=0.001,num_gpus=0.001, resources={"node:" + ips[m % N]:0.001}).remote(m) for m in range(M)]
    outputs = [i for j in outputs for i in j]
    while True:
        time.sleep(0.01)
        done_id, outputs = ray.wait(outputs, num_returns = M * R, fetch_local=False)
        print(done_id, outputs)
        if len(outputs) == 0:
            break
    print(done_id)
    return [reduce.options(num_cpus=0.001,num_gpus=0.001, resources={"node:" + ips[r % N]:0.001}).remote([done_id[r + i * R] for i in range(M)]) for r in range(R)]

start = time.time()
results = nb_shuffle()
z = ray.get(results)
print(z[0].shape)
print(time.time() - start)

On my setup of a cluster with two machines, the blocking shuffle version that uses ray.wait to make sure that all the consumer tasks have produced their data before carrying out the reducers is routinely faster than the nonblocking version where the reducers decide for themselves when to kick off their jobs based on data availability, for many different values of M and R, though the effect is more pronounced when M and R are larger.

This seems to go against the philosophy of Ray as a distributed futures system, so there must be some simple explanation that I am missing.

Great question!

First, note that in this version of shuffle, it is actually expected that the blocking vs non-blocking versions should execute in about the same amount of time. That’s because a reduce task cannot start executing until all of the map tasks finish, so it shouldn’t matter too much whether you explicitly block using ray.wait.

The reason you’re seeing a difference is most likely because of the way that your script is using ObjectRefs. In Ray, note that passing ObjectRefs inside of a list to a task will not block the task’s execution. So in your non-blocking version, the reduce tasks will actually get assigned to a worker and start executing even before the map tasks have finished. You can check this yourself by putting a print statement at the beginning of the reduce function to see when it starts executing.

In addition, you probably don’t want to pass the num_cpus=0.001 argument, since this will cause many more tasks to execute in parallel than the number of cores available, and this can cause unnecessary resource contention and memory pressure.

So most likely, it’s a combination of these two factors that’s causing a difference between the two versions. Your non-blocking script is counterintuitively causing more resource pressure, because it’s starting extra reduce workers that then immediately block because the map tasks haven’t finished yet.

Here’s an updated version of your script that:

  1. Uses Python’s variable-length arguments to pass ObjectRefs directly to the reduce task. This will change the reduce function to take each ObjectRef as a separate direct argument, instead of a single argument that is a list of ObjectRefs. Now, Ray will not schedule a reduce task until all of its arguments are ready.
  2. Removes the custom resource requirements. Unless you have specific scheduling needs (e.g., there is a file that only exists on one node, or you want to colocate a specific task with a GPU), it is often best to just use the defaults.
  3. Runs the shuffle multiple times. Especially for shorter applications that run in a minute or less, the first run of a script will include warmup time to start Ray processes, so the timing information here is not that reliable.

I’ve only run this on my laptop, but I did not see a significant difference between the two versions:

import ray
import torch
M=8
R=8
N = 2

import time

@ray.remote(num_returns=R)
def map(m):
    start = time.time()
    data = torch.randn(1000,10000)
    #data = torch.randn(1000,10000,device=torch.device('cuda'))
    print("random data gen used", time.time() - start)
    interval = 1000 // M
    return [data[i * interval: (i+1) * interval] for i in range(R)]

@ray.remote
def reduce(*blocks):  # Note the use of * here.
    return torch.cat(blocks)

def nb_shuffle():
    map_out = [map.remote(m) for m in range(M)]
    return [reduce.remote(*[z[r] for z in map_out]) for r in range(R)]


def b_shuffle():
    outputs = [map.remote(m) for m in range(M)]
    outputs = [i for j in outputs for i in j]
    while True:
        time.sleep(0.01)
        done_id, outputs = ray.wait(outputs, num_returns = M * R, fetch_local=False)
        print(done_id, outputs)
        if len(outputs) == 0:
            break
    print(done_id)
    return [reduce.remote(*[done_id[r + i * R] for i in range(M)]) for r in range(R)]

for _ in range(3):
    start = time.time()
    results = nb_shuffle()
    z = ray.get(results)
    print(z[0].shape)
    print(time.time() - start)


1 Like

Thanks for the detailed response, you guys are the best