How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I am using Ray version 1.13.0. I am trying the below script by initiating storage for S3.
from time import sleep
import traceback
import ray
from ray import workflow
from ray.workflow.common import WorkflowNotFoundError
@workflow.step
def download(filename):
print(f"Downloading: {filename}")
return f"Downloaded: {filename}"
@workflow.step
def process(file_1, file_2):
print(f"Processing files...")
return f"Processed: {file_1}, {file_2}"
@workflow.step
def pipeline(file_1, file_2):
output_1 = None
output_2 = None
try:
status = workflow.get_status(workflow_id=f"down-workflow-{file_1}")
print(f"Download workflow STATUS {file_1}: {status}")
except WorkflowNotFoundError: #TODO
print("Workflow doesn't exist.")
down1 = download.step(file_1)
down1_pro = down1.run_async(workflow_id=f"down-workflow-{file_1}")
try:
status = workflow.get_status(workflow_id=f"down-workflow-{file_2}")
print(f"Download workflow STATUS {file_2}: {status}")
except WorkflowNotFoundError:
print("Workflow doesn't exist.")
down2 = download.step(file_2)
down2_pro = down2.run_async(workflow_id=f"down-workflow-{file_2}")
output_1 = ray.get(workflow.get_output(workflow_id=f"down-workflow-{file_1}"))
print(f"OUTPUT_1: {output_1}")
output_2 = ray.get(workflow.get_output(workflow_id=f"down-workflow-{file_2}"))
print(f"OUTPUT_2: {output_2}")
out_file_1 = output_1
out_file_2 = output_2
pro = process.step(out_file_1,out_file_2)
process_exe = pro.run_async(workflow_id=f"process-workflow-{out_file_1}_{out_file_2}")
ready, _ = ray.wait([process_exe])
final_output = ray.get(ready)
print(f"process workflow-output:{final_output}")
return final_output
def create_batches(l, batch_count):
batches = []
offset = batch_count
for itr in range(0, len(l), offset):
batches.append(l[itr:itr+offset])
return batches
def main():
pipeline_lst = []
wait_list = []
pipeline_1 = pipeline.step("a.txt", "b.txt")
workflow_id = f"pipeline_workflow_a.txt_b.txt"
pipeline_lst.append((pipeline_1,workflow_id))
pipeline_2 = pipeline.step("b.txt", "c.txt")
workflow_id = f"pipeline_workflow_b.txt_c.txt"
pipeline_lst.append((pipeline_2,workflow_id))
batches = create_batches(l=pipeline_lst,batch_count=3)
for batch in batches:
for step,workflow_id in batch:
wait_list.append(step.run_async(workflow_id=workflow_id))
batch_size = 3 if len(wait_list) > 3 else len(wait_list)
while len(wait_list) > 0:
ready, wait_list = ray.wait(
wait_list, num_returns=batch_size)
batch_size = 3 if len(wait_list) > 3 else len(wait_list)
out = ray.get(ready)
print(f"Workflow_Output FINAL: {out}")
if __name__ == "__main__":
if not ray.is_initialized():
ray.init(storage="s3://stgrayworkflow/consumer")
workflow.init()
main()
ray.shutdown()
The problem I am facing is…when I execute the script. It expectedly runs in async mode. But in S3 bucket, the folder structure is unexpected as under the download workflow down-workflow-b.txt/steps/
, I got two __main__.download
and __main__.download_1
.
The ray.get tries to access __main__.download_1
which don’t even run but it the process tries to fetch the output which eventually fails with the log
File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/workflow/step_executor.py", line 46, in _resolve_static_workflow_ref
workflow_ref = ray.get(workflow_ref.ref)
File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/worker.py", line 1831, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::load() (pid=2866, ip=172.31.22.217)
File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/workflow/workflow_access.py", line 352, in load
raise ValueError(
ValueError: Cannot load output from step id __main__.download_1 in workflow down-workflow-b.txt
Surprisingly, this all works fine when the local storage path is given. This only generates the error when the storage is S3.
Looking for the reasons and the solution to the error.
Thanks in Advance
Harshal