How to get the running task of a synchronous, single-threaded actor

hello, I want call many different method in sequence of a synchronous, single-threaded actor,and these method could take a lot of time,I want watch the progress,is ray has interface to get so?
like ray.get_actor_running_task(actor)?
some code like below


import ray
import os
import time
import random
import sys
from string import ascii_lowercase

ray.init()

def mock_crash(threshold=0.01):
    rr = random.random()
    if rr < threshold:
        print("mock crash now")
        sys.exit(-1)  
        
@ray.remote
class ReentrantCounter:
    def __init__(self, value, checkpoint_metadata=None):
        print("actor pid is {}".format(os.getpid()))
        if checkpoint_metadata is None:
            self.value = value
        else:
            self.value = checkpoint_metadata['value']

    def increment(self, i=1):
        print("actor pid is {}".format(os.getpid()))
        self.value += i        
        # do something take a lot of time
        time.sleep(random.random()*3)
#         mock_crash()
        return self.value


def make_name(n=10):
    return ''.join([random.choice(ascii_lowercase) for _ in range(n)])

actor_name = make_name()
print("create actor with name {} now".format(actor_name))
actor = ReentrantCounter.options(name=actor_name).remote(1)

async_reses = [actor.increment.remote(i) for i in range(10)]

# maybe has some interface like this?
ray.get_actor_running_task(actor)

The experimental State API can be used for this. The below code is able to tell which task is currently running on the actor.

Short version:

tasks = list_tasks(filters=[
    ("scheduling_state", "=", "RUNNING"),
    ("actor_id", "=", obj_ref),
])

Long version, will print out what task is running in a:

#!/usr/bin/env python3

import ray
import time

ray.init()

@ray.remote
class A:
    def serve1(self):
        print('sleep1')
        time.sleep(1)

    def serve2(self):
        print('sleep2')
        time.sleep(1)

    def serve3(self):
        print('sleep3')
        time.sleep(1)

def get_fn(i, a, b, c):
    return [a, b, c][i % 3]

a = A.remote()

ray.get(a.serve1.remote())

futures = [get_fn(i, a.serve1, a.serve2, a.serve3).remote() for i in range(100)]
obj_ref = str(a).replace('Actor(A, ', '').replace(')', '')

from ray.experimental.state.api import get_actor, list_tasks

while True:
    time.sleep(0.5)
    tasks = list_tasks(filters=[
        ("scheduling_state", "=", "RUNNING"),
        ("actor_id", "=", obj_ref),
    ])

    if tasks:
        print(tasks[0]['func_or_class_name'])