The whole process of Ray.task().remote Java API

Hi, I am looking into the Ray.task().remote() java API and trying to understand the whole process of remote task, but I am kinda of stuck in c++ part.
My understanding is when invoke the remote() , it finally invokes the Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitTask() ->CoreWorkerProcess::GetCoreWorker().SubmitTask()-> task_manager_->AddPendingTask(), the task . But I can’t see who is responsible of scheduling the task. Can anyone tell me what is next after AddPendingTask(). Thank you very much for your response!

Hello, can anyone help on this topic?

@raulchen could you help on this one?

Hi @JUAN_CHEN ,
If it’s an actor task, the next action is direct_actor_submitter_->SubmitTask(task_spec) at ray/core_worker.cc at master · ray-project/ray · GitHub .

If it’s normal task, the next action is direct_task_submitter_->SubmitTask() at ray/core_worker.cc at master · ray-project/ray · GitHub .

Some local scheduling things are considered at direct_actor_submitter or direct_task_submitter.

Does this answer your question?

1 Like

Thanks @jovany-wang . I did see that code CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) . I think this is a very critical code snippet . However, I have multiply questions about what is inside the function. Can you help to answer them?

  1. when lease client invokes RequestWorkerLease in CoreWorkerDirectTaskSubmitter , I think the node manage would invoke HandleRequestWorkerLease to handle the request. Can you tell me where he assigns the suitable worker to this lease, and how the raylet collaborates with the GCS to get the remote worker lease.
  2. after it gets the suitable worker, it finally invokes PushNormalTask on CoreWorkerClientInterface. So if the task is java code task, how would the remote java worker execute the task. Can you tell the process when invokes PushNormalTask .

Thank you for your response and help.

Sorry for the delay reply.

(1)
It depends on what kind of task here. If it’s a normal task, node_manager:: HandleRequestWorkerLease() will reject this request if this raylet is not able to start a worker to run this task(see this line ray/node_manager.cc at master · ray-project/ray · GitHub). And then direct_task_submitter will receive the reply and then re-lease-worker-to-a-new-raylet(see this line ray/direct_task_transport.cc at master · ray-project/ray · GitHub).

If it’s a actor creation task, it’ll not reach the line direct_task_submitter->LeaseWorker(), it will request a CreateActor to GCS, and then GCS chooses a raylet to run the task. GCS requestWorkerLease from raylet. And then raylet will invoke HandleRequestWorkerLease() as well. So you can see some IsActorCreationTask() condition in HandleRequestWorkerLease.

(2) A Java worker process will be startted by WorkerPool in node manager. CoreWorker::HandleTask() will be invoked if raylet push a normal task to it(ray/direct_actor_transport.cc at master · ray-project/ray · GitHub). normal_scheduling_queue_->Add() will append the task to the queue, and then normal_scheduling_queue->ScheduleRequests() will invoke the accept_callback which is defined already at ray/io_ray_runtime_RayNativeRuntime.cc at master · ray-project/ray · GitHub

@JUAN_CHEN If you’re more interested in how Ray Java worker works, slack will be a better place that we can talk about more details.

Thank you @jovany-wang . I am very interested in how the worker works. I will go to Slack. Thank you.