Ray Actor and TensorFlow Logging

I have been using RLlib and Tune for a while now, but have never used the ray.remote decorator for my own multiprocessing until now.

My project requires me to run Population-Based Training on DQN and concurrently evaluate the checkpoints whenever they are made. The evaluation process is done with a custom code, so I don’t think the evaluation config in the Trainer API is an option.

Anyways, I made the following dummy code to test if I can run a Server class that has the logging object as a class variable that logs the value as given. I’m really bad at explaining this code since frankly I’m not sure what I want. My point is, the following code does not make any TensorBoard files. If anyone knows why I’m not getting any files, please let me know.

TL;DR: The code below does not produce any TensorBoard log files. Please help me.

import os
from os import sep

import ray
import tensorflow as tf


class TBLogger:
    def __init__(self):
        self.tensorboard_loggers = {}

        for policy_num in range(6):
            print(f"Policy-Eval {policy_num} made.")
            eval_name = "EVAL-DEBUG"
            tensorboard_log_dir = os.path.join(os.getcwd(), eval_name, "EVAL_" + str(policy_num))
            self.tensorboard_loggers[policy_num] = tf.summary.create_file_writer(tensorboard_log_dir)

    def log(self, iter):
        for policy_writer_num in range(6):
            with self.tensorboard_loggers[policy_writer_num].as_default():
                tf.summary.scalar(name="Reward", data=iter+10, step=iter)
                tf.summary.scalar(name="Utilization", data=iter+20, step=iter)
                tf.summary.scalar(name="A-Taping", data=iter+30, step=iter)
                tf.summary.scalar(name="C-Packing", data=iter+40, step=iter)
                tf.summary.scalar(name="RTF", data=iter+50, step=iter)
                self.tensorboard_loggers[policy_writer_num].flush()


@ray.remote
class Server:
    def __init__(self):
        self.logger = TBLogger()

    def start(self):
        for i in range(100):
            self.logger.log(i)


if __name__ == "__main__":
    ray.init()
    server = ray.get(Server.remote())
    server.start.remote()

Can you print the working directory python - Find current directory and file's directory - Stack Overflow within the TBLogger class and see if files are generated there?

I did the following:

class TBLogger:
    def __init__(self):
        import tensorflow as tf

        self.tensorboard_loggers = {}

        for policy_num in range(6):
            eval_name = "EVAL-DEBUG"
            tensorboard_log_dir = os.path.join(os.getcwd(), "LOGS", eval_name, "EVAL_" + str(policy_num))
            self.tensorboard_loggers[policy_num] = tf.summary.create_file_writer(tensorboard_log_dir)

            print(os.path.dirname(os.path.realpath(__file__)))
            print(os.getcwd())

    def log(self, iter):
        import tensorflow as tf

        for policy_writer_num in range(6):
            with self.tensorboard_loggers[policy_writer_num].as_default():
                tf.summary.scalar(name="Reward", data=iter+10, step=iter)
                tf.summary.scalar(name="Utilization", data=iter+20, step=iter)
                tf.summary.scalar(name="A-Taping", data=iter+30, step=iter)
                tf.summary.scalar(name="C-Packing", data=iter+40, step=iter)
                tf.summary.scalar(name="RTF", data=iter+50, step=iter)
                
                print(os.path.dirname(os.path.realpath(__file__)))
                print(os.getcwd())
                
                self.tensorboard_loggers[policy_writer_num].flush()

I got no print statements whatsoever, and of course, no tensorboard files created in any directory.
I seems like calling TBLogger() within the Ray Actor Server via remote() is not possible? But I don’t see a reason why this is the case.

I’m trying to debug this in Windows 10, and the same thing happens in Linux Centos 7 fyi.

Hmm interesting. If I see your original script, it is supposed to print print(f"Policy-Eval {policy_num} made."). This part is not printed as well?

@sangcho, sorry for replying late.
I re-ran my original code and it gives the following error:

C:\ProgramData\Anaconda3\envs\venv_Ray\python.exe C:/Users/kaiyu/Desktop/aps-ray-rllib/FullCycle/ray_remote_testing.py
2021-06-23 09:41:15.506880: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'cudart64_101.dll'; dlerror: cudart64_101.dll not found
2021-06-23 09:41:15.507249: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-06-23 09:41:27,007	INFO services.py:1267 -- View the Ray dashboard at http://127.0.0.1:8265
Traceback (most recent call last):
  File "C:/Users/kaiyu/Desktop/aps-ray-rllib/FullCycle/ray_remote_testing.py", line 41, in <module>
    server = ray.get(Server.remote())
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\worker.py", line 1468, in get
    raise ValueError("'object_refs' must either be an object ref "
ValueError: 'object_refs' must either be an object ref or a list of object refs.
(pid=8648) 2021-06-23 09:41:38.575872: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'cudart64_101.dll'; dlerror: cudart64_101.dll not found
(pid=8648) 2021-06-23 09:41:38.579814: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.

I suppose trying to “get” a Ray remote actor gives an error since the object itself doesn’t technically return anything.
Thus, I changed the main statement to the following:

if __name__ == "__main__":
    ray.init()
    server = Server.remote()     # Note how I got rid of the `get` function.
    server.start.remote()

Now, it doesn’t give an error, but it also doesn’t print print(f"Policy-Eval {policy_num} made.") Neither does it make any tensorboard logging directories. Here’s the log:

C:\ProgramData\Anaconda3\envs\venv_Ray\python.exe C:/Users/kaiyu/Desktop/aps-ray-rllib/FullCycle/ray_remote_testing.py
2021-06-23 09:44:05.817426: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'cudart64_101.dll'; dlerror: cudart64_101.dll not found
2021-06-23 09:44:05.817800: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-06-23 09:44:16,894	INFO services.py:1267 -- View the Ray dashboard at http://127.0.0.1:8265
(pid=10496) 2021-06-23 09:44:28.722859: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'cudart64_101.dll'; dlerror: cudart64_101.dll not found
(pid=10496) 2021-06-23 09:44:28.726059: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.

Process finished with exit code 0

If I get rid of the ray.remote decorator and run the exact same code shown below, I get all the print statements and the log directories are made and data is logged:

import os

import ray
import tensorflow as tf


class TBLogger:
    def __init__(self):
        self.tensorboard_loggers = {}

        for policy_num in range(6):
            print(f"Policy-Eval {policy_num} made.")
            eval_name = "EVAL-DEBUG"
            tensorboard_log_dir = os.path.join(os.getcwd(), eval_name, "EVAL_" + str(policy_num))
            self.tensorboard_loggers[policy_num] = tf.summary.create_file_writer(tensorboard_log_dir)

    def log(self, iter):
        for policy_writer_num in range(6):
            with self.tensorboard_loggers[policy_writer_num].as_default():
                tf.summary.scalar(name="Reward", data=iter+10, step=iter)
                tf.summary.scalar(name="Utilization", data=iter+20, step=iter)
                tf.summary.scalar(name="A-Taping", data=iter+30, step=iter)
                tf.summary.scalar(name="C-Packing", data=iter+40, step=iter)
                tf.summary.scalar(name="RTF", data=iter+50, step=iter)
                self.tensorboard_loggers[policy_writer_num].flush()


class Server:
    def __init__(self):
        self.logger = TBLogger()

    def start(self):
        for i in range(100):
            self.logger.log(i)


if __name__ == "__main__":
    ray.init()
    server = Server()
    server.start()

Hi @Kai_Yun,

Have you tried putting a get around the start.remote()?

With this code here:

if __name__ == "__main__":
    ray.init()
    server = Server.remote()
    server.start.remote()

What is happening is that since remote calls are non-blocking it is returning immediately and the program is exiting before start starts and everything is shutdown.

In ray an actor or remote call can be cleaned up whenever the reference handle goes out of scope. If for example you modified this to have a long sleep after calling start.remote() you might still not see any results since you did not assign the reference returned by remote to a variable.

Thanks for the response @mannyv.
Based on your comment, I first tried the following code. As you can see, I got rid of the ray decorator from the Server class and just added it to the start method. Then wrapped a get around the start.remote() as recommended.

import os

import ray


class TBLogger:
    def __init__(self):
        self.tensorboard_loggers = {}
        print("Init TBLogger.")

    def init_logger(self):
        import tensorflow as tf

        for policy_num in range(6):
            eval_name = "EVAL-DEBUG"
            tensorboard_log_dir = os.path.join(os.getcwd(), "../LOGS", eval_name, "EVAL_" + str(policy_num))
            self.tensorboard_loggers[policy_num] = tf.summary.create_file_writer(tensorboard_log_dir)

            print(f"TB Logger for eval-{policy_num} made!")

    def log(self, iter):
        import tensorflow as tf

        for policy_writer_num in range(6):
            with self.tensorboard_loggers[policy_writer_num].as_default():
                tf.summary.scalar(name="Reward", data=iter+10, step=iter)
                tf.summary.scalar(name="Utilization", data=iter+20, step=iter)
                tf.summary.scalar(name="A-Taping", data=iter+30, step=iter)
                tf.summary.scalar(name="C-Packing", data=iter+40, step=iter)
                tf.summary.scalar(name="RTF", data=iter+50, step=iter)

                print(f"Logged iteration {iter} for policy-{policy_writer_num}.")

                self.tensorboard_loggers[policy_writer_num].flush()


class Server:
    def __init__(self):
        print("Server class")

    def init_logger(self):
        self.logger = TBLogger()
        self.logger.init_logger()

    @ray.remote
    def start(self):
        for i in range(100):
            self.logger.log(i)


if __name__ == "__main__":
    ray.init()
    server = Server()
    server.init_logger()
    ray.get(server.start.remote())

This successfully makes the tensorboard logging directories and file writers since the init_logger method is not a ray function. However, it produces the following error:

C:\ProgramData\Anaconda3\envs\venv_Ray\python.exe C:/Users/kaiyu/Desktop/aps-ray-rllib/FullCycle/etc/tb_test.py
2021-06-24 09:05:16,452	INFO services.py:1267 -- View the Ray dashboard at http://127.0.0.1:8265
Server class
Init TBLogger.

--- I got rid of all the tensorflow related logs ---

TB Logger for eval-0 made!
TB Logger for eval-1 made!
TB Logger for eval-2 made!
TB Logger for eval-3 made!
TB Logger for eval-4 made!
TB Logger for eval-5 made!

Traceback (most recent call last):
  File "C:/Users/kaiyu/Desktop/aps-ray-rllib/FullCycle/etc/tb_test.py", line 55, in <module>
    ray.get(server.start.remote())
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\remote_function.py", line 104, in _remote_proxy
    return self._remote(args=args, kwargs=kwargs)
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\remote_function.py", line 307, in _remote
    return invocation(args, kwargs)
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\remote_function.py", line 275, in invocation
    list_args = ray._private.signature.flatten_args(
  File "C:\Users\kaiyu\AppData\Roaming\Python\Python38\site-packages\ray\_private\signature.py", line 116, in flatten_args
    raise TypeError(str(exc)) from None
TypeError: missing a required argument: 'self'

Process finished with exit code 1

So, is it safe to think that I can’t wrap an individual method within a class with a ray decorator? Seems like only functions and entire classes can use ray decorators.

Anyways, I tried your original recommendation like the following code:

@ray.remote
class Server:
    def __init__(self):
        print("Server class")

    def init_logger(self):
        self.logger = TBLogger()
        self.logger.init_logger()

    def start(self):
        for i in range(100):
            self.logger.log(i)


if __name__ == "__main__":
    ray.init()
    server = Server.remote()
    server.init_logger.remote()
    ray.get(server.start.remote())

And it works like a charm!

However, I wanted to avoid this since I wanted this start method to run in the background.
As you can read from my original post, I am trying to run DQN with PBT while having my custom evaluator client and logging server running in the background. This client constantly checks for any new checkpoint made and evaluates it, then sends the evaluation result to the server, which logs the data with tb.writers. And I want to do this all in one script. The reason I want to avoid using a get around the start method is that this will be a blocking command and won’t let other functions be executed if I do incorporate all these functionalities into one script.
Now I think about it, it may not be necessary to implement all these into one script. But if I do, am I right that using get will be blocking and won’t let others be executed?

Here’s a pseudocode of what I’d like to implement:

@ray.remote
def train():
	results = tune.run("DQN", scheduler=pbt, ...)
	return results


@ray.remote()
class LoggerServer:
	def init_server():
	
	def init_logger():
		# Initializes tensorboard loggers.

	def start_logging():
		# Runs a while loop that constantly checks for
		# new evaluation results from the client.
		# Logs the results whenever available.


@ray.remote()
class ReplayerClient:
	def init_client():
	
	def check_new_checkpoints():
		# Checks if any new checkpoint from training 
		# is made.	

	def init_replayer(checkpoint_path):
		# Initializes the "replayer" for evaluation.
		# i.e., initializes the simulator for eval
		# and a Ray DQNTrainer with given checkpoint.
	
	def start_replay():
		# Runs a while loop that constantly calls
		# `check_new_checkpoints` and runs `init_replayer`
		# whenever new checkpoint is available.

 
if __name__ == "__main__":
	# Starts running `train` in the background. 
	# This runs without any issue.
	train_results = train.remote()
	
	# Inits `LoggerServer` and `ReplayerClient`.
	server = LoggerServer.remote()
	client = ReplayerClient.remote()
	
	# Starts evaluation and logging process.
	server.start_logging.remote()
	client.start_replay.remote()

As you can see, I’m very new to coding with Ray functions, so if you can take another look at this, it’ll be very helpful.

Thanks again!

Hi,

Glad you are making progress. One comment that may help. You do not have to call get if you do not need a result. You can do something like:

server_start_task = server.start.remote()
#... all the code you want to run
ray.kill(server_start_task)

I should have read all the way to the end. The sample you have should work (assuming all the code inside those calls works of course).

You are really close you are just missing one thing. All of those remote calls are non blocking so after it executes all of them the program will exit and everything will end almost immediately.

Add a ray.get(train_results) at the very end an you should be good.

You might need to add a sleep or something after that get to make sure the last checpoint is processed before ending. Perhaps you could add a shutdown method for the server and client that you call when training is done. Then you would do ray.get(server.shutdown.remote())
Same fof client.

Finally, you should also assign the server/client.start_…remote() to variables or they might be terminated automatically and too soon since there are no more references to the handles.

1 Like

Thank you so much for the detailed response!

I’ll need to change my original code a bit to try this. I’ll do so as soon as I can and get back to you.
Again, I really appreciate all the help.

1 Like