Using ray for data processing

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity

Hi,
I would like to learn how to use RAY for data processing (mapreduce, paritioning, etc).

  1. Are there any getting started tutorials for large-scale datasets, say word count, grep, top N - does not matter, what’s important is that it won’t be a toy example and that it will process data larger than memory?
  2. When I run a process operating on data larger than memory, does it mean that object spilling will occur, and I need to provide configuration for the object_spilling_config ?

Thanks!

cc @Clark_Zinzow. Can you address this question?

Hi @Roman_Gershman, thanks for the questions!

Ray’s ecosystem of libraries has a few options for Ray-based data processing, offering higher-level abstractions (e.g. datasets, dataframes, tensors) geared towards ML, ETL, data science, etc. use cases that hide the distributed nature of the underlying execution:

  • Ray Datasets - our own Ray library for data processing in ML pipelines, namely data loading, preprocessing, ingestion into distributed trainers, and parallel batch inference. This is also the data processing engine behind the higher-level Ray AIR library.
  • Spark-on-Ray - Run large-scale Spark data processing workloads on a Ray cluster.
  • Modin - A Ray-based distributed DataFrame library.
  • Mars-on-Ray - A distributed tensor framework on Ray.
  • Dask-on-Ray - Run any Dask workloads on a Ray cluster.

To answer your specific questions:

  1. There are a few lower-level Ray Core examples that you may find interesting here, and I was able to dig up an old toy example streaming MapReduce example. Doing something like distributed word count or grep is pretty trivial using any of the above libraries, e.g. with Ray Datasets it could be as simple as:
wc = ray.data.read_text("s3://big_text/").map(
    lambda doc: doc.count(" ") + 1).sum()
pattern = "foo"
occurrences = ray.data.read_text("s3://big_text/") \
                      .flat_map(lambda doc: doc.split()) \
                      .flat_map(
                          lambda word: ([word] if pattern in word else []))
  1. If the operation is streaming (i.e. you only load a window of data into the cluster at a time and release it before starting to process the next window), you can process a large dataset entirely in memory. Otherwise, if the Ray object store is nearing its capacity, Ray will automatically spill data to the local disk of the Ray worker nodes. More details about Ray’s object spilling can be found here: Object Spilling — Ray 3.0.0.dev0