Unexpected behavior when using generators with Ray Dataset

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Questions/Feedback?

Hi all, I’m a beginner using Ray, so my question may be very simple / I may be doing something really wrong.

What I’m trying to do is load a jsonlines dataset using Ray Dataset. This dataset is composed of multiple jsonline files, each with multiples lines.

At first I was just doing something like (just a snippet example, loading small jsonlines files, for demonstration purposes)

import json
from pathlib import Path

import ray
import ray.data


def load_jsonlines(filepath):
    with open(filepath, mode='rt', encoding='utf-8') as f:
        for jsonline in f:
            if jsonline.isspace():
                continue
            yield json.loads(jsonline)


def load(batch):
    for file_path in batch:
        json_iter = load_jsonlines(file_path)
        split_filename = Path(file_path).stem
        result = []
        for row in json:
           result.append({"json": row, "factory": split_filename})
        return pd.DataFrame(result)

if __name__ == "__main__":

    nr_cores = 1  # int(os.getenv(NR_CORES_ENVVAR, DEFAULT_NR_CORES))
    ray.init(
        include_dashboard=True,
        dashboard_port=8265,
        num_cpus=nr_cores,
        _system_config={"automatic_object_spilling_enabled": False},
        local_mode=True
    )
    
    dataset_filepaths = ["../data/test/a.jsonlines", "../data/test/b.jsonlines"]
    ds = ray.data.from_items(dataset_filepaths).map_batches(load, zero_copy_batch=True)
    sample = ds.take(1)

and this worked, but it was consuming too much RAM as some jsonlines files are very big and loaded into memory occupy over 20 GBs of RAM.

So I wanted to switch this to generators. And to do so I had the following code (again just a snippet example loading small jsonlines files for demonstration purposes)

import json
from pathlib import Path

import ray
import ray.data


def load_jsonlines(filepath):
    with open(filepath, mode='rt', encoding='utf-8') as f:
        for jsonline in f:
            if jsonline.isspace():
                continue
            yield json.loads(jsonline)


def load(batch):
    print(batch)
    for file_path in batch:
        print(file_path)
        json_iter = load_jsonlines(file_path)
        split_filename = Path(file_path).stem
        print(split_filename)
        for row in json_iter:
            yield [{"line": row, "name": split_filename}]
            print(f"Yielding {row}; name={split_filename}")



if __name__ == "__main__":

    nr_cores = 1  # int(os.getenv(NR_CORES_ENVVAR, DEFAULT_NR_CORES))
    ray.init(
        include_dashboard=True,
        dashboard_port=8265,
        num_cpus=nr_cores,
        _system_config={"automatic_object_spilling_enabled": False},
        local_mode=True
    )
    
    dataset_filepaths = ["../data/test/a.jsonlines", "../data/test/b.jsonlines"]
    ds = ray.data.from_items(dataset_filepaths).map_batches(load, zero_copy_batch=True, , batch_size=1)
    sample = ds.take(1)

but when I do the take instead of loading just a single line into memory as I was expecting, it seems to yield and load into memory all the lines of the jsonlines before actually performing the take and in the end of the file (after yielding all elements of file a) it gives me a segmentation fault. This is the output I see

How can I change this to be able to stream batches of jsonlines of my file without loading all the lines into memory (and without getting a segmentation fault) ? I am expecting that at my take 1

EDIT: Segmentation fault seems to be happening due to the local_mode=True (when I take it off I don’t have it anymore), but is it expected ?

Hi @jpnn94 , welcome to the Ray discourse!

In general, we recommend using Ray Data methods to read in data when possible in order to take advantage of efficiency gains from improvements, such as streaming execution and strict mode. For more end-to-end examples, I recommend checking out our user guides.

  • For reading in the a collection of JSON files in a directory, I would recommend using the ray.data.read_json method. The method expects a list of JSON-formatted files, so you may need to modify the format of your input files to have a single JSON object per file instead of multiple objects in a single file (for example, you could make use of a nested directory structure to represent groupings of JSONs).
  • The local_mode parameter is deprecated for ray.init in Ray 2.3 [docs], so we would recommend against using it.

Let me know if the above are helpful, or if you have any followups.