I was trying to experiment with throttling ray execution and ensuring that tasks do not overrun predefined duration. Here is my code:
import ray, time
ray.init()@ray.remote
def busy(i):
time.sleep(i)
return i#dictionary of execution times
executions = {}
ids =extime = 2
#start first 5 executions
current_time = time.time()
for x in range(0, 5):
id = busy.remote(extime)
ids.append(id)
executions[id] = current_time
extime = extime + 5print(‘ids’, ids)
print(‘executions’, executions)while(True):
# Run every sec
ready, not_ready = ray.wait(ids, timeout = 5.0)
# adjust duration
if extime >= 40:
extime = 2
# Nothing completed
if not ready:
continuecurrent_time = time.time() # Process not_ready ones for key in not_ready: execution_time = current_time - executions[key] #print('key ', key, ' execution time ', execution_time) if execution_time >= 20.0: # we timed out print('execution ', key, ' is too long, canceling ') ray.cancel(key, force=True, recursive=True) ids.remove(key) executions.pop(key, 0) # add new execution id = busy.remote(extime) ids.append(id) executions[id] = current_time extime = extime + 5 # Process ready ones for key in ready: print('complete execution of ', key, ' value ', ray.get(key), ' duration ', current_time - executions[key]) executions.pop(key) ids.remove(key) # add new execution id = busy.remote(extime) ids.append(id) executions[id] = current_time extime = extime + 5
It seems to work fine, but because I delete long running Ray tasks, it gives me these wornings:
execution ObjectRef(480a853c2c4c6f27ffffffffffffffffffffffff0100000001000000) is too long, canceling
execution ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000) is too long, canceling
complete execution of ObjectRef(c6953afc4a9f69e9ffffffffffffffffffffffff0100000001000000) value 12 duration 12.00232195854187
2021-04-26 17:10:37,529 ERROR worker.py:78 – Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): The worker died unexpectedly while executing this task. Check python-core-worker-.log files for more information.
2021-04-26 17:10:37,529 ERROR worker.py:78 – Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): The worker died unexpectedly while executing this task. Check python-core-worker-.log files for more information.
complete execution of ObjectRef(402ddcfdf56ca87affffffffffffffffffffffff0100000001000000) value 17 duration 17.004695892333984
complete execution of ObjectRef(24eed4584329c19affffffffffffffffffffffff0100000001000000) value 2 duration 2.0068249702453613
Where exactly do I suppress with RAY_IGNORE_UNHANDLED_ERRORS=1 ?