Ray Tune process hangs - thread synchronization issue?

When training RLLib model using Ray Tune with many experiments the process hangs. When process is manually terminated the following message is displayed which seems to suggest there is a problem with the synchronization between different threads.

Process exited with code: 1
PC: @ 0x7f8ceecea71e (unknown) pthread_cond_timedwait@@GLIBC_2.3.2

Is this a known problem? Is there a workaround?

Setup is Ray 2.1.0 executing on AWS c5 instance type with Ubuntu 18.04.

Thanks.
Stefan

no I am not aware of this issue.
Can you run the same program with just a single trial? Does it hang?

This only seems to happen when using many trials. 1 trial works, 20 trials also works, but when using 40 trials the process hangs. This is running on a c5.24xlarge AWS instance type with 96 vCPUs.

Can you provide the full console log? Are trials hanging in PENDING or RUNNING?
Can you also attach your script?
Can you do ray stack when things are hanging?

Console log:
(AIRDQN pid=20592) 2023-02-09 21:56:44,973 INFO simple_q.py:308 – In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn’t work for you.
(AIRDQN pid=20592) 2023-02-09 21:56:44,974 INFO algorithm.py:460 – Current log_level is WARN. For more information, set ‘log_level’: ‘INFO’ / ‘DEBUG’ or use the -v and -vv flags.
(AIRDQN pid=20669) 2023-02-09 21:56:44,973 INFO simple_q.py:308 – In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn’t work for you.
(AIRDQN pid=20669) 2023-02-09 21:56:44,974 INFO algorithm.py:460 – Current log_level is WARN. For more information, set ‘log_level’: ‘INFO’ / ‘DEBUG’ or use the -v and -vv flags.
(AIRDQN pid=20666) 2023-02-09 21:56:44,978 INFO simple_q.py:308 – In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn’t work for you.
(AIRDQN pid=20666) 2023-02-09 21:56:44,979 INFO algorithm.py:460 – Current log_level is WARN. For more information, set ‘log_level’: ‘INFO’ / ‘DEBUG’ or use the -v and -vv flags.
Read: 0%| | 0/1 [00:00<?, ?it/s]
Read: 0%| | 0/1 [00:00<?, ?it/s]
(AIRDQN pid=20667) 2023-02-09 21:56:44,988 INFO simple_q.py:308 – In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn’t work for you.
(AIRDQN pid=20667) 2023-02-09 21:56:44,989 INFO algorithm.py:460 – Current log_level is WARN. For more information, set ‘log_level’: ‘INFO’ / ‘DEBUG’ or use the -v and -vv flags.
Read: 0%| | 0/1 [00:00<?, ?it/s]
Read: 0%| | 0/1 [00:00<?, ?it/s]
(AIRDQN pid=20668) 2023-02-09 21:56:45,242 INFO simple_q.py:308 – In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn’t work for you.
(AIRDQN pid=20668) 2023-02-09 21:56:45,243 INFO algorithm.py:460 – Current log_level is WARN. For more information, set ‘log_level’: ‘INFO’ / ‘DEBUG’ or use the -v and -vv flags.
Read: 0%| | 0/1 [00:00<?, ?it/s]
(AIRDQN pid=19972) /home/ec2-user/.local/lib/python3.7/site-packages/ray/rllib/utils/filter.py:84: DeprecationWarning: Passing None into shape arguments as an alias for () is deprecated.
(AIRDQN pid=19972) self.mean_array = np.zeros(shape)
(RolloutWorker pid=20076) /home/ec2-user/.local/lib/python3.7/site-packages/ray/rllib/utils/filter.py:84: DeprecationWarning: Passing None into shape arguments as an alias for () is deprecated.
(RolloutWorker pid=20076) self.mean_array = np.zeros(shape)

Here is the tail end of the ray stack output when the process hangs. How can I attach a file to share the entire ray stack output?

Stack dump for ec2-user 37113 0.8 0.2 64578864 453176 pts/1 SNl+ 22:09 0:11 ray::AIRRLTrainer.init()
Process 37113: ray::AIRRLTrainer.init()
Python v3.7.16 (/usr/bin/python3.7)

Thread 37113 (idle): “MainThread”
pthread_cond_timedwait@@GLIBC_2.3.2 (libpthread-2.26.so)
ray::core::GetRequest::Wait (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::GetImpl (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::Wait (ray/_raylet.so)
ray::core::CoreWorker::Wait (ray/_raylet.so)
wait (ray/_raylet.so)
wait (ray/_private/worker.py:2481)
wrapper (ray/_private/client_mode_hook.py:105)
fetch_until_complete (ray/data/_internal/progress_bar.py:74)
_apply (ray/data/_internal/compute.py:115)
call (ray/data/_internal/plan.py:672)
execute (ray/data/_internal/plan.py:309)
init (ray/data/dataset.py:217)
repartition (ray/data/dataset.py:810)
get_dataset_and_shards (ray/rllib/offline/dataset_reader.py:179)
init (ray/rllib/evaluation/worker_set.py:131)
setup (ray/rllib/algorithms/algorithm.py:531)
_resume_span (ray/util/tracing/tracing_helper.py:466)
init (ray/tune/trainable/trainable.py:161)
init (ray/rllib/algorithms/algorithm.py:414)
init (ray/train/rl/rl_trainer.py:214)
_resume_span (ray/util/tracing/tracing_helper.py:466)
actor_method_executor (ray/_private/function_manager.py:674)
function_executor (ray/_raylet.so)
_raylet_task_execution_handler (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string const&, std::string const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool*, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool), ray::Status ()(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string, std::string, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool)>::_M_invoke (ray/_raylet.so)
ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool, bool*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>, std::_Placeholder<6>, std::_Placeholder<7>)::)(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > > const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool*, bool*)> >::_M_invoke (ray/_raylet.so)
ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
std::_Function_handler<void (std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
ray::core::InboundRequest::Accept (ray/_raylet.so)
ray::core::NormalSchedulingQueue::ScheduleRequests (ray/_raylet.so)
EventTracker::RecordExecution (ray/_raylet.so)
std::_Function_handler<void (), instrumented_io_context::post(std::function<void ()>, std::string)::{lambda()#1}>::_M_invoke (ray/_raylet.so)
boost::asio::detail::completion_handler<std::function<void ()>, boost::asio::io_context::basic_executor_type<std::allocator, (unsigned int)0> >::do_complete (ray/_raylet.so)
boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
boost::asio::detail::scheduler::run (ray/_raylet.so)
boost::asio::io_context::run (ray/_raylet.so)
ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
run_task_loop (ray/_raylet.so)
main_loop (ray/_private/worker.py:763)
(ray/_private/workers/default_worker.py:231)
Thread 38420 (idle): “ray_import_thread”
do_futex_wait (libpthread-2.26.so)
__new_sem_wait_slow (libpthread-2.26.so)
PyThread_acquire_lock_timed (libpython3.7m.so.1.0)
wait (threading.py:300)
_wait_once (grpc/_common.py:112)
wait (grpc/_common.py:157)
result (grpc/_channel.py:735)
_poll_locked (ray/_private/gcs_pubsub.py:249)
poll (ray/_private/gcs_pubsub.py:385)
_run (ray/_private/import_thread.py:70)
run (threading.py:870)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)
Thread 38621 (idle): “Thread-2”
do_futex_wait (libpthread-2.26.so)
__new_sem_wait_slow (libpthread-2.26.so)
PyThread_acquire_lock_timed (libpython3.7m.so.1.0)
wait (threading.py:300)
wait (threading.py:552)
run (tqdm/_monitor.py:60)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)
Thread 47419 (idle): “Thread-5”
epoll_wait (libc-2.26.so)
0x7f21b28defae (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b28e31a2 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b28e738f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b29d7e47 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b2a452b5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b2a45479 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b2add7b8 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b2addf5f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f21b2adfdb5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
channel_spin (grpc/_channel.py:1258)
0x7f21b2a2740c (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
run (threading.py:870)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)

Stack dump for ec2-user 37117 0.8 0.2 64578792 453884 pts/1 SNl+ 22:09 0:10 ray::AIRRLTrainer.init()
Process 37117: ray::AIRRLTrainer.init()
Python v3.7.16 (/usr/bin/python3.7)

Thread 37117 (idle): “MainThread”
pthread_cond_timedwait@@GLIBC_2.3.2 (libpthread-2.26.so)
ray::core::GetRequest::Wait (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::GetImpl (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::Wait (ray/_raylet.so)
ray::core::CoreWorker::Wait (ray/_raylet.so)
wait (ray/_raylet.so)
wait (ray/_private/worker.py:2481)
wrapper (ray/_private/client_mode_hook.py:105)
fetch_until_complete (ray/data/_internal/progress_bar.py:74)
_apply (ray/data/_internal/compute.py:115)
call (ray/data/_internal/plan.py:672)
execute (ray/data/_internal/plan.py:309)
init (ray/data/dataset.py:217)
repartition (ray/data/dataset.py:810)
get_dataset_and_shards (ray/rllib/offline/dataset_reader.py:179)
init (ray/rllib/evaluation/worker_set.py:131)
setup (ray/rllib/algorithms/algorithm.py:531)
_resume_span (ray/util/tracing/tracing_helper.py:466)
init (ray/tune/trainable/trainable.py:161)
init (ray/rllib/algorithms/algorithm.py:414)
init (ray/train/rl/rl_trainer.py:214)
_resume_span (ray/util/tracing/tracing_helper.py:466)
actor_method_executor (ray/_private/function_manager.py:674)
function_executor (ray/_raylet.so)
_raylet_task_execution_handler (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string const&, std::string const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool*, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool), ray::Status ()(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string, std::string, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool)>::_M_invoke (ray/_raylet.so)
ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool, bool*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>, std::_Placeholder<6>, std::_Placeholder<7>)::)(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > > const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool*, bool*)> >::_M_invoke (ray/_raylet.so)
ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
std::_Function_handler<void (std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
ray::core::InboundRequest::Accept (ray/_raylet.so)
ray::core::NormalSchedulingQueue::ScheduleRequests (ray/_raylet.so)
EventTracker::RecordExecution (ray/_raylet.so)
std::_Function_handler<void (), instrumented_io_context::post(std::function<void ()>, std::string)::{lambda()#1}>::_M_invoke (ray/_raylet.so)
boost::asio::detail::completion_handler<std::function<void ()>, boost::asio::io_context::basic_executor_type<std::allocator, (unsigned int)0> >::do_complete (ray/_raylet.so)
boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
boost::asio::detail::scheduler::run (ray/_raylet.so)
boost::asio::io_context::run (ray/_raylet.so)
ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
run_task_loop (ray/_raylet.so)
main_loop (ray/_private/worker.py:763)
(ray/_private/workers/default_worker.py:231)
Thread 38489 (idle): “ray_import_thread”
do_futex_wait (libpthread-2.26.so)
__new_sem_wait_slow (libpthread-2.26.so)
PyThread_acquire_lock_timed (libpython3.7m.so.1.0)
wait (threading.py:300)
_wait_once (grpc/_common.py:112)
wait (grpc/_common.py:157)
result (grpc/_channel.py:735)
_poll_locked (ray/_private/gcs_pubsub.py:249)
poll (ray/_private/gcs_pubsub.py:385)
_run (ray/_private/import_thread.py:70)
run (threading.py:870)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)
Thread 38624 (idle): “Thread-2”
do_futex_wait (libpthread-2.26.so)
__new_sem_wait_slow (libpthread-2.26.so)
PyThread_acquire_lock_timed (libpython3.7m.so.1.0)
wait (threading.py:300)
wait (threading.py:552)
run (tqdm/_monitor.py:60)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)
Thread 47463 (idle): “Thread-5”
epoll_wait (libc-2.26.so)
0x7f8ecc95afae (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8ecc95f1a2 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8ecc96338f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8ecca53e47 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8eccac12b5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8eccac1479 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8eccb597b8 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8eccb59f5f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7f8eccb5bdb5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
channel_spin (grpc/_channel.py:1258)
0x7f8eccaa340c (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
run (threading.py:870)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)

Stack dump for ec2-user 37803 0.8 0.2 64578740 453176 pts/1 SNl+ 22:09 0:11 ray::AIRRLTrainer.init()
Process 37803: ray::AIRRLTrainer.init()
Python v3.7.16 (/usr/bin/python3.7)

Thread 37803 (idle): “MainThread”
pthread_cond_timedwait@@GLIBC_2.3.2 (libpthread-2.26.so)
ray::core::GetRequest::Wait (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::GetImpl (ray/_raylet.so)
ray::core::CoreWorkerMemoryStore::Wait (ray/_raylet.so)
ray::core::CoreWorker::Wait (ray/_raylet.so)
wait (ray/_raylet.so)
wait (ray/_private/worker.py:2481)
wrapper (ray/_private/client_mode_hook.py:105)
fetch_until_complete (ray/data/_internal/progress_bar.py:74)
_apply (ray/data/_internal/compute.py:115)
call (ray/data/_internal/plan.py:672)
execute (ray/data/_internal/plan.py:309)
init (ray/data/dataset.py:217)
repartition (ray/data/dataset.py:810)
get_dataset_and_shards (ray/rllib/offline/dataset_reader.py:179)
init (ray/rllib/evaluation/worker_set.py:131)
setup (ray/rllib/algorithms/algorithm.py:531)
_resume_span (ray/util/tracing/tracing_helper.py:466)
init (ray/tune/trainable/trainable.py:161)
init (ray/rllib/algorithms/algorithm.py:414)
init (ray/train/rl/rl_trainer.py:214)
_resume_span (ray/util/tracing/tracing_helper.py:466)
actor_method_executor (ray/_private/function_manager.py:674)
function_executor (ray/_raylet.so)
_raylet_task_execution_handler (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string const&, std::string const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool*, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool), ray::Status ()(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptrray::RayObject, std::allocator<std::shared_ptrray::RayObject > > const&, std::vector<ray::rpc::ObjectReference, std::allocatorray::rpc::ObjectReference > const&, std::string, std::string, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::shared_ptrray::LocalMemoryBuffer&, bool, bool*, std::vector<ray::ConcurrencyGroup, std::allocatorray::ConcurrencyGroup > const&, std::string, bool)>::_M_invoke (ray/_raylet.so)
ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool, bool*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>, std::_Placeholder<6>, std::_Placeholder<7>)::)(ray::TaskSpecification const&, std::shared_ptr<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hashstd::string, std::equal_tostd::string, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > > const&, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptrray::RayObject >, std::allocator<std::pair<ray::ObjectID, std::shared_ptrray::RayObject > > >, google::protobuf::RepeatedPtrFieldray::rpc::ObjectReferenceCount, bool*, bool*)> >::_M_invoke (ray/_raylet.so)
ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
std::_Function_handler<void (std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::CoreWorkerDirectTaskReceiver::HandleTask(ray::rpc::PushTaskRequest, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
ray::core::InboundRequest::Accept (ray/_raylet.so)
ray::core::NormalSchedulingQueue::ScheduleRequests (ray/_raylet.so)
EventTracker::RecordExecution (ray/_raylet.so)
std::_Function_handler<void (), instrumented_io_context::post(std::function<void ()>, std::string)::{lambda()#1}>::_M_invoke (ray/_raylet.so)
boost::asio::detail::completion_handler<std::function<void ()>, boost::asio::io_context::basic_executor_type<std::allocator, (unsigned int)0> >::do_complete (ray/_raylet.so)
boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
boost::asio::detail::scheduler::run (ray/_raylet.so)
boost::asio::io_context::run (ray/_raylet.so)
ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
run_task_loop (ray/_raylet.so)
main_loop (ray/_private/worker.py:763)
(ray/_private/workers/default_worker.py:231)
Thread 38571 (idle): “ray_import_thread”
do_futex_wait (libpthread-2.26.so)
__new_sem_wait_slow (libpthread-2.26.so)
0x7ff2e13aa1a2 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e13ae38f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e149ee47 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e150c2b5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e150c479 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e15a47b8 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e15a4f5f (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
0x7ff2e15a6db5 (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
channel_spin (grpc/_channel.py:1258)
0x7ff2e14ee40c (grpc/_cython/cygrpc.cpython-37m-x86_64-linux-gnu.so)
run (threading.py:870)
_bootstrap_inner (threading.py:926)
_bootstrap (threading.py:890)
clone (libc-2.26.so)

Can you also post trial table? Like ray status? How many CPUs are used by Ray?

Feels like the RLlib trainer init is hanging to me and there are some ray dataset stuff going on.

The theory is that all the CPUs are used by RLlib trainer but none are left for ray dataset (yes, they are accounted separately).

I would recommend you limiting your max_concurrent_trials in tune to some lower number.

Or you can allocate less CPUs per RLlib trainer so that there are some left for dataset to proceed.

cc @gjoliver

Here is the ray status output when the process hangs:

======== Autoscaler status: 2023-02-09 23:46:09.713074 ========
Node status

Healthy:
1 node_8cd90d012e891be5ed00303ed41b8708b021ea1e203cf3391e2b1b6d
Pending:
(no pending nodes)
Recent failures:
(no failures)

Resources

Usage:
44.0/96.0 CPU (44.0 used of 96.0 reserved in placement groups)
0.00/119.494 GiB memory
0.02/55.203 GiB object_store_memory

Demands:
{‘CPU’: 1.0}: 27+ pending tasks/actors
{‘CPU’: 1.0} * 3 (PACK): 1+ pending placement groups

Setting max_concurrent_trials=20 does work. But when setting max_concurrent_trials=40 ray status indicates that only 44 out of 96 cores are used, so I’m not sure why there aren’t enough CPUs for Ray tune to load the offline data. What am I missing?

Model training includes an off-policy evaluation step that loads a separate test dataset. Could that be the source of this issue?

Here is the code. Please let me know if there is a more efficient way to load train and test data into Ray when training an RLLib model using offline data, and performing off-policy evaluation using a separate test dataset.

def train(self):

    # load train data
    train_dataset = ray.data.read_json(self.offline_data_info['train_dir'])

    # create trainer
    trainer = self.create_trainer(train_dataset=train_dataset)

    # search algorithm
    search_algo = OptunaSearch(
        metric='evaluation/off_policy_estimator/doubly_robust_fitted_q_eval/v_target',
        mode='max'
    )

    # scheduler
    scheduler = ASHAScheduler(
        metric='evaluation/off_policy_estimator/doubly_robust_fitted_q_eval/v_target',
        mode='max',
        time_attr='training_iteration',
        max_t=5,
        grace_period=1
    )

    # create tuner
    tuner = Tuner(

        # trainer
        trainer,

        # create tune configuration
        tune_config=self.create_tune_config(
            search_algo=search_algo,
            scheduler=scheduler
        ),

        # hyper-parameters
        param_space=self.create_param_space(),

        # save checkpoint - run configuration doesn't work in Ray Air, use _tuner_kwargs to specify checkpoint settings
        _tuner_kwargs=dict(checkpoint_at_end=True),
    )

    # train models
    result_grid = tuner.fit()

    # convert content in result grid to pandas dataframe
    df_result = self.create_results_dataframe(result_grid=result_grid)

    return df_result

def create_trainer(self, train_dataset):

    return RLTrainer(

        # run config
        run_config=RunConfig(
            stop=dict(training_iteration=5),
            verbose=3
        ),

        # scaling config
        scaling_config=ScalingConfig(
            num_workers=1,
            use_gpu=False
        ),

        # train dataset
        datasets=dict(train=train_dataset),

        # algorithm
        algorithm='DQN',

        # config
        config=dict(
            action_space=self.action_space,
            observation_space=self.observation_space,
            framework='torch',
            evaluation_interval=1,
            evaluation_duration=10000,
            evaluation_duration_unit='episodes',
            evaluation_parallel_to_training=False,
            evaluation_num_workers=1,
            evaluation_config=dict(input=self.offline_data_info['test_dir']),

            # off-policy estimation
            off_policy_estimation_methods=dict(

                # doubly robust method
                doubly_robust_fitted_q_eval=dict(
                    type=DoublyRobust,
                    q_model_config=dict(
                        type=FQETorchModel,
                        model=[64]
                    )
                )
            )
        )
    )

def create_tune_config(self, search_algo, scheduler):

    return tune.TuneConfig(
        num_samples=40,
    max_concurrent_trials=40,
        search_alg=search_algo,
        scheduler=scheduler
    )

def create_param_space(self):

    return dict(
        lr=tune.loguniform(1e-6, 1e-3),
        observation_filter=tune.choice(['NoFilter', 'MeanStdFilter']),
        batch_mode=tune.choice(['truncate_episodes', 'complete_episodes']),
        train_batch_size=tune.choice([100, 200, 400]),
        model=dict(
            fcnet_activation=tune.choice(['relu', 'elu']),
            fcnet_hiddens=tune.choice(self.network_configurations)
        )
    )

What is your overall CPU usage when doing 20 trials at a time?
Looking at your ray status output, it says 44.0/96.0 CPU (44.0 used of 96.0 reserved in placement groups). So although only 44 is used but all 96 CPUs are already reserved. So there are no additional ones for ray dataset, which causes the issue.
If you feel like the system is not well utilized for running just 20 trials, I wonder maybe you should overwrite the default resource request for DQN trainer. So that from Ray’s perspective, each Trainer uses less resources and thus more trials can fit into 96 cores with some additional CPUs left (from Ray’s scheduling perspective) for dataset.
I feel like off-policy evaluation is pretty common for offline RL. So I would think that rllib should support that or maybe we even have an example for that. But I will leave it to rllib folks to comment here. Tagging a few of them: @gjoliver @kourosh

Ray status generates the following when num_samples=40 and max_concurrent_trials=20. In this scenario the process also hangs.

======== Autoscaler status: 2023-02-10 02:26:01.118026 ========
Node status

Healthy:
1 node_37f8461f77118647728ca8112a71dc4df2a0744ba994ae01cd6c16b5
Pending:
(no pending nodes)
Recent failures:
(no failures)

Resources

Usage:
46.0/96.0 CPU (46.0 used of 96.0 reserved in placement groups)
0.00/119.067 GiB memory
0.03/55.020 GiB object_store_memory

Demands:
{‘CPU’: 1.0}: 26+ pending tasks/actors
{‘CPU’: 1.0} * 3 (PACK): 1+ pending placement groups

Ray status generates the following when num_samples=20 and max_concurrent_trials=20. In this scenario model training executes successfully.

======== Autoscaler status: 2023-02-10 02:30:16.729754 ========
Node status

Healthy:
1 node_888f7311961ee1c9bf1a4942162def6837a527eb597ddf4d7de41fa3
Pending:
(no pending nodes)
Recent failures:
(no failures)

Resources

Usage:
60.0/96.0 CPU (60.0 used of 60.0 reserved in placement groups)
0.00/119.068 GiB memory
0.08/55.020 GiB object_store_memory

Demands:
(no resource demands)

I thought setting num_workers=1 in ScalingConfig meant that one CPU was allocated for each trial. But that doesn’t seem to be the case. What is the formula for calculating how many CPUs will be reserved/used by Ray tune as a function of num_samples and max_concurrent_trials settings?

Is the number of CPUs reserved by Ray tune also affected by the fact that this RLLib model is trained using an offline train dataset and that it performs off-policy evaluation using a separate offline test dataset?

setting max_concurrent_trials to 1 will force tune to run 1 trial concurrently. If you do this ray status it will tell you how many cpus the base trial will use. setting max_concurrent_trials to any other number will set a cap for maximum concurrent trials and your cpu utilization should be num_current_trial * cpu per trial. Please note that since we are using ray.datasets for evaluation the num_cpus used may dynamically change based on available resources (as long as we have at least on cpu left for ray.dataset the workload should not hang), if we are unlucky and set the trial number such that there is no resources left for ray.dataset it will cause a hang. As far as I know, there is currently no solution to safe-guard ourselves against such resource deprivation when many tune trials that include ray.dataset operations within, are run concurrently.

cc @Clark_Zinzow for visibility on the issue.

Ray status generates the following when num_samples=1 and max_concurrent_trials=1 which indicates that Ray tune will reserve 3 CPUs for each trial. Hence if a machine has 96 CPUs available, to avoid this resource deprivation issue set max_concurrent_trials to at most 30 which would leave 6 CPUs for other activities e.g., ray.dataset operations. Is that correct?

======== Autoscaler status: 2023-02-10 20:24:19.842377 ========
Node status

Healthy:
1 node_817771a5c4064da5444e737e58663fd1c6dc5e9968614d4d7ca8843a
Pending:
(no pending nodes)
Recent failures:
(no failures)

Resources

Usage:
2.0/96.0 CPU (2.0 used of 3.0 reserved in placement groups)
0.00/119.072 GiB memory
0.00/55.022 GiB object_store_memory

Demands:
(no resource demands)

Yep. So with your setup the max concurrent number should be 30. But you can increase that by providing fractional cpu resource requirements for each trial. Say num_workers=0.5 would for example double your max_concurrent cap. But at some point it may become a problem. Here is a comment that I wrote a while back regarding the complication that exist in offline RL resource management: ray/algorithm.py at master · ray-project/ray · GitHub