Stream data from external executable

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

How to interact with external code?

I’m interested in using Ray for training an ANN on data that are generated on the fly by an external program. This program is slow to compute and generate vast amount of data that cannot be stored. Thus the need for online generation and training.
I would like to know if Ray provides an API to execute an external code, which is not Python based but an executable. The executable is a long task that sends data progressively. I want to stream those data and train the network in parallel.

Related Points

Is it possible to coax your external program to writing its data into a sequence of files? Ray has a data pipeline API that can be used to define this kind of stream of data for training.

I would try something like this:
(1) Get your external program to write output in chunks to /tmp/output-{i} in sequence.
(2) In Ray, launch the program as a subprocess running in the background.
(3) Define a DatasetPipeline that waits and reads the /tmp/output-{i} files in sequence (optionally, delete the file after it’s been read).

import ray
import os
import time
from ray.data.dataset_pipeline import DatasetPipeline

def read_output():
    # Suppose the process is putting data in /tmp/output-{i}
    i = 0
    while True:
        next_file = f"/tmp/output-{i}"
        # Wait for next output file to appear.
        while not os.path.exists(next_file):
            time.sleep(1)
        print("Got output file", next_file)
        # or read_binary_files, numpy, etc.
        yield lambda: ray.data.read_csv([next_file])
        i += 1

#
# ... code to subprocess.Popen() your data creation process ...
#
pipe = DatasetPipeline.from_iterable(read_output())
# Consume generated data.
for x in pipe.iter_rows():
    print(x)

The DatasetPipeline.from_iterable() API is pretty flexible, so you can also imagine other approaches of connecting the data that isn’t file based, as long as you can turn it into an iterator/generator of Datasets it can be turned into a pipeline.

There’s some more about pipelines here: Pipelining Compute — Ray 1.12.0 Hope this helps!

Thank you for you answer.
One motivation of using such approach is to avoid I/O bottleneck of massive data. Still, if we manage to expose the generated data to Python we can use the generator as you suggest.
Additionally, I understood Ray offered the possibility to manage the resources used by each functions. So using Ray I could specify the allocation for my data generation. Is it correct? We assume the data generation is a parallelised program for which I would like to allocate 4 processes and enforce fault tolerance.

So using Ray I could specify the allocation for my data generation. Is it correct? We assume the data generation is a parallelised program for which I would like to allocate 4 processes and enforce fault tolerance.

Yes for this (assuming your program is parallelized in Ray). If you’re parallelizing it actors, you could create 4 Ray actors, and use @ray.remote(num_cpus=1) to tell the scheduler to reserve a CPU per actor. If the program isn’t parallelized with Ray, you could still “reserve” some CPUs for it by creating a “dummy” Ray actor with num_cpus=4, and running the 4-core computation from that actor that isn’t using those resources.

For fault tolerance, actors has the ability to auto-restart with max_retries=; I’d check out the docs here: https://docs.ray.io/en/latest/ray-core/actors/fault-tolerance.html