Why unpacking doesn't work with ray objects

Hi, I was using Ray for a project and encountered this problem. To illustrate, here is a toy example which intends to write a wrapper to run any function on remote. The wrapper takes the function to be executed and its inputs as arguments.

Now there are two ways to pass the inputs as shown by wrapper and wrapper2. B=Both should be equivalent I guess. However with jobB, wrapper() gives an error saying that it cannot add two ray objects. However wrapper2 works fine.

I am unable to explain this behaviour (maybe it is related to python, and not ray). Any insight would be appreciated. Thanks:)


######################################################################
@ray.remote
def wrapper(func, inputs):
	out = func(*inputs)
	return out

@ray.remote
def wrapper2(func, *inputs):
	input_list = [ip for ip in inputs]
	out = func(*input_list)
	return out

def jobA(x):
	time.sleep(1)
	return x*x

def jobB(x,y):
	time.sleep(1)
	return x+y

######################################
ob_A1 = wrapper.remote(jobA,(1,))
ob_A2 = wrapper.remote(jobA,(2,))

inputs = (ob_A1,ob_A2,)
######## DOES NOT WORK######
ob_B1  = wrapper.remote(jobB, inputs)

#########THIS WORKS ###########
ob_B2  = wrapper2.remote(jobB, *inputs)

print(ray.get(ob_B1))
print(ray.get(ob_B2))
_________________________________________
__________________

Hi Chirag!

wrapper.remote(jobB, inputs) doesn’t work because only Ray objects that are top-level task arguments will get automatically resolved to their underlying values, and given that inputs is a tuple of Ray objects, ob_A1 and ob_A2 will not get resolved. In other words, when resolving Ray object task arguments, Ray does not traverse arbitrary data structures in the task arguments to find and resolve those Ray objects and repackage them into the original argument data structures when the task executes, and instead expects any Ray objects that you wish to be automatically resolved to be top-level arguments.

Meanwhile, in wrapper2.remote(jobB, *inputs), ob_A1 and ob_A2 are each top-level arguments to the wrapper2 task, so those Ray objects are resolved to their concrete values as expected.

I actually opened a feature request a while back for adding support for resolving Ray objects arbitrarily nested in task argument data structures. If you think that would be a useful feature for your use case, please +1 that issue!

Thank you Clark for the detailed reply! Makes a lot of sense now

I think your feature request would be very useful in closing the gap between ray futures and simple outputs (as in a single threaded program). Abstractions that would make the two indistinguishable would be nice indeed! (liked the feature request)

As a follow up, in passing output of one remote to another remote function, is it same as doing a ray.get and then using it. That is, are these two same:

A)
@ray.remote
def func(input_future):
output = input_future+1
return output

B)
@ray.remote
def func(input_future):
input = ray.get(input_future)
output = input + 1
return output

I understand ray.get is blocking, but that should be fine since one worker handles one remote function (my guess).

Thanks and have a good day!

Those are functionally the same, but A) is preferable since making the future a task argument allows Ray to take both the size of the object and the time at which the object has been materialized somewhere in the cluster into account when scheduling the task. I.e., if input_future is the output of another task, the task in A) won’t be scheduled onto a worker until the task that creates input_future has finished executing, and Ray will do some other fancy stuff like (in a multi-node cluster) attempt to execute the task in A) on the node where the input_future object already exists so the object doesn’t have to be transferred to another machine (locality-aware scheduling).

In B), Ray doesn’t know that your task needs input_future until the task is already executing, so we can’t take that dependency into account when we’re trying to make smart scheduling decisions.

Got it! Thanks Clark.

1 Like