Abyssmal perf - How to not do it wrong with ray data reading text

Trying a very simple experiment with ray data to ensure I’m using it well. I’m getting abyssmal performance.

I started with 10 files with ~10MB of rows:

  100000 1.txt
  100000 10.txt
  100000 2.txt
  100000 3.txt
  100000 4.txt
  100000 5.txt
  100000 6.txt
  100000 7.txt
  100000 8.txt
  100000 9.txt
 1000000 total
bash-3.2$ ```

Data is simple like so:

1.txt:363: 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789 01234 56789

1.txt:364: abcde fghij klmno pqrst uvwxy zabcd efghi jklmn opqrs tuvwx yz```

I use this code:

import glob
import pandas as pd
# Use glob to create a list of file paths with .txt extension in the specified directory
text_file_paths = glob.glob(f"{PATH_TO_DATA}/*.txt")

# Pass the list of file paths to read_text
datasets = ray.data.read_text(text_file_paths, include_paths=True, parallelism=4)

def func(x: pd.DataFrame) -> pd.DataFrame:
    # Transform the data into a single string for each pair and return as a DataFrame with a named column
    return pd.DataFrame([f"{path}: {text}" for path, text in zip(x['path'], x['text'])], columns=['data'])
    
result = datasets.map_batches(func, batch_size=4096*10, num_cpus=4, zero_copy_batch=True)
i = 0
for row in result.iter_rows():
    i = i+1
    if (i%10000==0):
        print(row)

This is on a MBP m2max, thus 4+4 on the cpus, giving it all the perfmrance cores. This code works but it is exceedingly slow, taking almost 3 full minutes to execute for the 1M rows. The exact same code running without ray on a single core takes 30s to complete.

What am I doing wrong? Thanks, just getting started, and I’m experimenting but not grokking my errors yet well.

Apparently this is the culprit and makes it very slow. I modified to do

# Start the timer
start_time = time.time()
result = datasets.map_batches(func, batch_size=4096, num_cpus=4, zero_copy_batch=True)

print(f"result is {result}")

result.random_shuffle().write_json("/var/datasets/random/shuffled")

This took <3s and the shuffled output did indeed contain all of the data. So I guess something about how I used iter_rows() is an issue; I tried adding prefetch_blocks=10000 and that didn’t help either (or maybe it helped modestly). Obviously an int/modulo/print should be negligible so something about iter_rows() here works unexpectedly slow. I tried with prefetch 50k and I think I ran out of fds (osx), and so I think that’s a hint.

I also then rewrote this so I did a 2nd map_batches to a function (batch size 100k) that converted to a pd.df, did the same %10000/print iterating that, it completed in 6s.

Appreciate any insight folks have, but fun to figure out some parts here.