Proper workflow for keeping Ray memory clean and separating returned python objects from their Ray references

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I have been testing with Ray for the past few months to determine its viability for our company to speed up and scale Python development and processing. I am using Ray as a standalone tool and testing locally as well as on a remote, single-node, cluster (a server).

I am starting out with what I think should be simple tasks to submit, parallelize, and return transformations via an input ray.remote function, but I have run into problems as my datasets grow larger. Available memory continues to fill up and then objects are spilled. No error is created but I need to prevent spilling and to limit memory usage. The solution that makes sense to me is to process the tasks in chunks so that Ray doesn’t see so many objects at once. Unfortunately, I can’t find a way to separate the returned Python objects from their “pinned” status in Ray so once the chuck is processed, the object is stuck in Ray until it is deleted from the Python space. This kind of defeats the purpose for me as I am trying to aggregate the results in the Python space so they can be viewed or used or stored.

My question is: what is the expected workflow to take care of this?
Should I be appending all results to a file and then deleting the Python object?
Should I be adding all values to a separate database and then deleting the Python object?
Can Ray[data] play a role to solve this?

Here is my current, simplified usage flow:

  1. Spawn n actors where n=num_cpus
  2. Add all the actors to a ray.util.ActorPool()
  3. Submit chunks of the task list to the pool using pool.map_unordered() or pool.submit()
  4. Return results in the chunks they were submitted
[self.results.append(x) for x in results]

or eagerly as they are completed

while self.pool.has_next():
    self.results.append(self.pool.get_next_unordered())

depending on how they were submitted
5. Kill all of the actors in the pool using ray.kill()

I suspect that I have additional memory problems as ray seems to hold a bunch of memory even after the job is completed and ray memory shows no memory being used. The memory is released again only when I stop the ray cluster.

Any feedback or help would be useful as I am quite new to this. I’m happy to provide more info if needed.

Hi @indierambler,

If your python objects (e.g. numpy arrays) are backed by ray object store memory, then the ray object store memory won’t be freed until your python object goes out of scope. What’s the type of your python objects?

Thanks for the reply! Yes, from the memory docs it appears that the python object stays pinned to Ray until it is taken out of scope. I’m trying to figure out a way to take it out of scope without deleting the object. The typical object type I use is a dict carrying an ID string and a result numpy array as follows:

result_dict = {
    'id': 'some id',
    'result': np.array([some_values])
}

Copying and deep copying don’t appear to take it out of scope (and also waste a lot of ram). I’m currently working toward trying the ray.internal.internal_api.free(actor) method from the ray source code but I’m also wondering if there is a preferred method for breaking this link so that I can keep objects in my preferred python memory and use the Ray object store as dynamic memory to speed up a batch job.

I’m assuming your results are very large here (cannot fit into memory even ignoring what Ray is doing). In this case, then you can consider writing them to disk as you process them by batch, though this would be fairly similar to what Ray is doing with object spilling internally.

Copying and deep copying don’t appear to take it out of scope

Hmm, deep copying should take it out of scope (specifically you want to .copy() the numpy array here). You can try with a single result and checking that it is no longer in ray memory after copying and un-refing the original object. Another possibility is that the object is not garbage collected yet by Python (you can use gc.collect() to force gc).

Datasets offers two ways of scaling this kind of batch processing and storage of results:

  1. You can use a normal dataset and call ray.data.from_items(...).map(processing_callable_fn, compute=ActorPoolStrategy(min, max)).write_parquet(...) to load, process, and write results to disk. You can put any stateful actor setup you need in the callable def:
class processing_callable_fn:
   def __init__(self):
      self.state = ...  # stateful setup
   def __call__(self, x):
      return x   # process x
  1. The above strategy will process all data in memory. If you want to limit memory usage to avoid spilling, you could use the windowing function to pipeline the computation:
ds = ray.data.from_items(...)

# Divide the dataset into 100 blocks, and process 10 at a time in memory.
pipeline = ds.repartition(100).window(blocks_per_window=10)

# Execute the pipelined computation and write to disk.
pipeline.map(...).write_parquet(...)

Hope this is useful.

Thanks @ericl That’s a lot of good information! I will first try a couple changes with my current actor pool version but I’m very interested in testing the dataset solutions.

One thing I wonder is if there is a difference between testing in a jupyter notebook and running basic scripts. I have read that jupyter does funny things with memory.

Ah, one gotcha is that Jupyter tends to secretly save variable references (cell results?) in its In/Out command history. So this could be keeping references alive even if you don’t intend it.

I think one workaround is to avoid surfacing variables in the top-level / as cell results.

I made a change in my code so that as soon as a result is received, it is immediately deepcopied to a local variable and then the local variable is appended to the class attribute. This does break the pinned connection to the object store so all is good! (Although it does add time to my processing).

[future], _ = ray.wait(list_of_futures, num_returns=1)
result = (ray.get(future))
self.results.append(copy.deepcopy(result))
#del result  # not needed because result is a local function variable
#ray.internal.internal_api.free([future])  # this function did not work as I hoped

I did have to rewrite my own version of the ray.util.ActorPool in order to get direct access to the future, but that wasn’t too bad in the end.

In the future I may add some code to try again with leaving the objects pinned until plasma is full and then batch-unpin them. That might speed things up a bit…

Ray Datasets may be a more interesting option for me in the future.

1 Like