How to speed up ray.get() to get a large object from another node?

There is an 800MB object on one node. The actor on this node first calls ref=ray.put(obj), and then passes the returned object reference to the actor on another node. The actor obtains the object through ray.get(ref), but sometimes it exceeds 2 minutes, this is incredible, what could be the reason for this? Is there any way to speed up the ray.get(ref)? Or is there a way to pass the object to another node in advance?

1 Like

Hi,

This definitely doesn’t sound expected in most environments. A couple questions:

  1. Which environment is this operating in? Do you have sufficient network throughput for the transfer to complete more quickly?
  2. Could you please include a code snippet so I can double check your workflow?

Thanks,
Jack

Hi @jhumphri , thanks for your reply. I am training on 2x A30 nodes with 8 cards each and network card bandwidth support 10 Gbps. Below is the problem code, and when the following code is executed, the bandwidth from the head to the worker node reaches 800MB/s, which is approximately twice the weight size.

    def pull_model_on_tick(self):
        msg = build_get_model_msg("SINGLE", self._league_id, 0, -1)
        # model 是 tuple
        model = ray.get(self._model_buffer.put_msg.remote(msg))

        if model is None:
            return False
        # model_value is ray.ObjectRef
        model_meta, model_value = model

        if model_meta.step != self._current_train_model_steps:
            if isinstance(model_value, ray.ObjectRef):
                t = time.time()
                # model_value is 800MB  numpy model weight,pull the object on the remote node to the node where the actor is located through ray.get(). Sometimes the delay of ray.get() here is about 1~2s, and sometimes it times out for 2 minutes.
                model_value = ray.get(model_value)
                self._logger.info(f"get model_value time: {time.time() - t}")

            self._model = model
            self._current_train_model_steps = model_meta.step
        return

    # Because the model has been pulled to the local gcs through pull_model_on_tick(), other actors on the node where this actor is located should not need to pull from other nodes when calling get_model()
def get_model(self):
    return self._model

But when I locked the model, the latency problem disappeared and the bandwidth remained at about 400MB/s. The modified code is as follows:

    def pull_model_on_tick(self):
		...       

        if model_meta.step != self._current_train_model_steps:
            if isinstance(model_value, ray.ObjectRef):
            	**with self._m_lock:**
	                t = time.time()
	                model_value = ray.get(model_value)
	                self._logger.info(f"get model_value time: {time.time() - t}")

		            self._model = model
		            self._current_train_model_steps = model_meta.step
        return


	def get_model(self):
    	**with self._m_lock:**		
	    	return self._model
 

I suspect that when the model is not locked, the actor that calls get_model() on the same node will also pull a model from the head node, so it is twice the weight size. But as far as I know, ray executes in the order of requests, why does this problem occur?

Thanks for your reply. I have a few more questions before we dig deeper:

  1. Is the weight size 800MB or 400MB? The original post seems to mention the former while the reply mentions the latter.
  2. Is the reply saying that the object size doubles or that the transfer throughput doubles when the model is not locked?
  3. Could you please elaborate on the specifics of model locking and what with self._m_lock does specifically?
  4. How much DRAM does each node have?

Thanks!

Hi @jhumphri , thank you for your patience.

  1. The weight size is 400MB.
  2. When I run the problem code, the bandwidth from head to worker node reaches about 800MB/s. So I suspect that there may be 2 copies of weights transmitted from head to worker node at the same time.
  3. The self._m_lock is a thread lock, which is initialized in the class constructor via self._m_lock = threading.RLock().
  4. The memory capacity of each node is 300GiB.

Thanks. It probably makes sense for me to guide you through running a perf analysis on Ray and the host OS so we can see what is happening during the two minute transfer, but I do have one more question first. In get_model() in the code above, self._model is being returned, but the wording in the post seems to imply that get_model() calls ray.get() if the model has not yet been transferred via GCS to the local node. Could you please clarify how this could happen? It seems right now that just an invalid model would be returned in this case–ray.get() would not be called.

Hi @jhumphri , maybe I didn’t express it clearly. From my understanding of Ray’s sequential request processing, when get_model() is called, the value returned is the current value of self._model in theory. If self._model is not updated by pull_model_on_tick(), then the old value is returned in theory and the objects which self._model reference should have been pulled to the current node gcs by pull_model_on_tick(). But when it is not locked by self._m_lock, I found that the bandwidth is about 800MB/s (about twice the model weight), and it seems that the calling actor is also pulling objects from the head node. Just my suspicion.