Ray Workflow storage on S3 unexpected behavior

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am using Ray version 1.13.0. I am trying the below script by initiating storage for S3.

from time import sleep
import traceback
import ray
from ray import workflow
from ray.workflow.common import WorkflowNotFoundError

@workflow.step
def download(filename):
    print(f"Downloading: {filename}")
    return f"Downloaded: {filename}"

@workflow.step
def process(file_1, file_2):
    print(f"Processing files...")
    return f"Processed: {file_1}, {file_2}"

@workflow.step
def pipeline(file_1, file_2):
    output_1 = None
    output_2 = None

    try:
        status = workflow.get_status(workflow_id=f"down-workflow-{file_1}")
        print(f"Download workflow STATUS {file_1}: {status}")
    except WorkflowNotFoundError: #TODO
        print("Workflow doesn't exist.")
        down1 = download.step(file_1)
        down1_pro = down1.run_async(workflow_id=f"down-workflow-{file_1}")

    try:
        status = workflow.get_status(workflow_id=f"down-workflow-{file_2}")
        print(f"Download workflow STATUS {file_2}: {status}")
    except WorkflowNotFoundError:
        print("Workflow doesn't exist.")
        down2 = download.step(file_2)
        down2_pro = down2.run_async(workflow_id=f"down-workflow-{file_2}")
    
    output_1 = ray.get(workflow.get_output(workflow_id=f"down-workflow-{file_1}"))
    print(f"OUTPUT_1: {output_1}")
    output_2 = ray.get(workflow.get_output(workflow_id=f"down-workflow-{file_2}"))
    print(f"OUTPUT_2: {output_2}")
    out_file_1 = output_1
    out_file_2 = output_2
    
    pro = process.step(out_file_1,out_file_2)
    process_exe = pro.run_async(workflow_id=f"process-workflow-{out_file_1}_{out_file_2}")
    ready, _ = ray.wait([process_exe])
    final_output = ray.get(ready)
    print(f"process workflow-output:{final_output}")
    return final_output
        
def create_batches(l, batch_count):
    batches = []
    offset = batch_count
    for itr in range(0, len(l), offset):
        batches.append(l[itr:itr+offset])
    return batches

def main():
    pipeline_lst = []
    wait_list = []

    pipeline_1 = pipeline.step("a.txt", "b.txt")
    workflow_id = f"pipeline_workflow_a.txt_b.txt"
    pipeline_lst.append((pipeline_1,workflow_id))
    
    pipeline_2 = pipeline.step("b.txt", "c.txt")
    workflow_id = f"pipeline_workflow_b.txt_c.txt"
    pipeline_lst.append((pipeline_2,workflow_id))
    
    batches = create_batches(l=pipeline_lst,batch_count=3)
    for batch in batches:
        for step,workflow_id in batch:
            wait_list.append(step.run_async(workflow_id=workflow_id))

        batch_size = 3 if len(wait_list) > 3 else len(wait_list)
        while len(wait_list) > 0:
            ready, wait_list = ray.wait(
                wait_list, num_returns=batch_size)
            batch_size = 3 if len(wait_list) > 3 else len(wait_list)
            out = ray.get(ready)
            print(f"Workflow_Output FINAL: {out}")


if __name__ == "__main__":
    if not ray.is_initialized():
        ray.init(storage="s3://stgrayworkflow/consumer")
        workflow.init()
    main()
    ray.shutdown()

The problem I am facing is…when I execute the script. It expectedly runs in async mode. But in S3 bucket, the folder structure is unexpected as under the download workflow down-workflow-b.txt/steps/, I got two __main__.download and __main__.download_1.

The ray.get tries to access __main__.download_1 which don’t even run but it the process tries to fetch the output which eventually fails with the log

  File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/workflow/step_executor.py", line 46, in _resolve_static_workflow_ref
    workflow_ref = ray.get(workflow_ref.ref)
  File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/worker.py", line 1831, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::load() (pid=2866, ip=172.31.22.217)
  File "/data/anaconda3/envs/openpit_monitoring/lib/python3.9/site-packages/ray/workflow/workflow_access.py", line 352, in load
    raise ValueError(
ValueError: Cannot load output from step id __main__.download_1 in workflow down-workflow-b.txt

Surprisingly, this all works fine when the local storage path is given. This only generates the error when the storage is S3.

Looking for the reasons and the solution to the error.

Thanks in Advance
Harshal

This may be an older bug. Could you try upgrading to 2.0 or greater and see if you have the same issue? Thanks!

Hi,

Thanks for the reply.

Actually, My problem statement is to prevent a single file from being downloaded multiple times.

Use case: There are 3 files A, B, and C. They need to be pipelined as A-B and B-C. Pipeline follows-

  1. Downloading of files A and B
  2. Processing it

Now, In Async mode, both pipelines (A-B and B-C) are executed. I need to make sure that file B is not being downloaded twice.

I am trying to achieve this using ray workflow.

In the above-mentioned script, I have used the WorkflowNotFoundError exception handling approach. If the “download workflow” for a respective file does not exist it generates it and follows the download step.
But I think it’s conflicting in async mode and eventually goes into a race condition.

can you please help, what should be the optimum approach for solving the problem?

I see, sorry about that I misunderstood your question!

Hmm, I’m not totally sure why this is happening, but I believe it may be partly due to the fact that you’re mixing different workflows together. Each time you call run or run_async it creates a distinct workflow, and now you will need to make sure to coordinate them correctly. I think it will be tricky in general to get this right.

Here is an alternative approach that runs the entire pipeline in a single workflow. This way, the Ray workflow system can coordinate the pipeline for you and guarantee no race conditions or duplicate workflow executions.

Granted, the current workflows API makes it a bit tricky to express this kind of pipelined or iterative logic. The idea is to switch to a recursive model where we download some number of files in advance. Here is a version you can try out:

from time import sleep
import traceback
import ray
from ray import workflow


@ray.remote
def download(filename):
    print(f"Downloading: {filename}")
    return f"Downloaded: {filename}"

@ray.remote
def process(remaining_files, results, batch_size):
    # Download the first batch_size + 1 files.
    # We will process the first `batch_size` many results here.
    # The last one will get downloaded in parallel.
    for i, file in enumerate(remaining_files):
        if isinstance(file, str):
            remaining_files[i] = download.bind(file)
        if i >= batch_size + 1:
            break

    batch = remaining_files[:batch_size]
    for item in batch:
        if not isinstance(item, ray.ObjectRef):
            return workflow.continuation(process.bind(remaining_files, results, batch_size))

    final_output = ray.get(batch)
    print(f"Processing files...")
    results.append(f"Processed: {batch}") 

    # Pop the first file since we have finished processing all of its batches.
    remaining_files.pop(0)
    if remaining_files:
        return workflow.continuation(process.bind(remaining_files, results, batch_size))

    return results


def main():
    pipeline_lst = []
    wait_list = []

    files = ["a.txt", "b.txt", "c.txt"]
    w = process.bind(files, [], 2)
    print(ray.workflow.run(w))


if __name__ == "__main__":
    if not ray.is_initialized():
        ray.init(storage="/tmp/workflow")
    main()

Just note that the workflows API is still in alpha, so unless you really need the durability aspect, it may be much simpler to express this kind of workload in the vanilla Ray API for now. But let me know how this works out for you, and if it doesn’t meet your needs, it would be great to figure out some future enhancements that we can add to the workflows API.