Possible leakage of memory using modin

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi, I’m new using ray. I want to use it with modin to save it in .pkl from .csv files. I’m following the code given in github for this: https://github.com/Kaist-ICLab/K EmoPhone_SupplementaryCodes/blob/main/analysis.ipynb. According to this, it was tested on:

  • CPU: AMD Ryzen 9 5900x 12-Core ( not mandatory; it can run this code with fewer cores)
  • RAM: 128GB (not mandatory; ~ 40GB of RAM can be required (but not tested))

First, I tried to stick to the code:

def _process(data_type: str):
  data_raw = _load_data(data_type) 
  data_proc = FUNC_PROC[data_type](data_raw)
   result = dict()

  if type(data_proc) is dict:
      for k, v in data_proc.items():
          result[f'{abbrev}_{k}'] = v
  else:
      result[abbrev] = data_proc
return result

with on_ray(num_cpus=12):
jobs = [ ]
func = ray.remote(_process).remote
for data_type in DATA_TYPES:
    job = func(data_type)
    jobs.append(job)
jobs = ray.get(jobs)
jobs = reduce(lambda a, b: {**a, **b}, jobs)
dump(jobs, os.path.join(PATH_INTERMEDIATE, 'proc.pkl'))
del jobs
gc.collect()

In the source code ray.init() is used for starting, but in my case. It can only works with additional settings:

ray.init(
num_cpus=4,
object_store_memory=20 * 1024 * 1024 * 1024, # mandatory
_temp_dir=“path/ray_temp”,
include_dashboard=False,
ignore_reinit_error=True,
_plasma_directory=“/tmp” # improvements to not get out of memory
)

Shortening my explanation:
These two function are the core of _process:
data_raw = _load_data(data_type) #load data of a sensor from 70 users that is in .csv and put that in a dataframe
data_proc = FUNC_PROCdata_type # make some calculations

When testing only for one data_type and 70 .csv, I got messages that ray needs ~400Gb of memory. Try to divide this in chunks for testing but always get the same message.

After this I tried to use modin for testing:

def _proc_acceleration(data: pd.DataFrame) → Union[pd.Series, Dict[str, pd.Series]]:
print_memory_usage(“antes de calcular la magnitud”)
data[‘mag’] = (data[‘x’]**2 + data[‘y’]**2 + data[‘z’]**2)**0.5

result = {
    'AXX': data['x'].astype('float32'),
    'AXY': data['y'].astype('float32'),
    'AXZ': data['z'].astype('float32'),
    'MAG': data['mag'].astype('float32')
}

return result

DATA_TYPES = {
‘Acceleration’: ‘ACC’,
}
FUNC_PROC = {
‘Acceleration’: _proc_acceleration
}

def load_and_process(name: str):
paths = _load_data_paths(name)
df = paths_to_dataframe(paths)

data_proc = FUNC_PROC[data_type](df)
del df
gc.collect()

result = dict()

if isinstance(data_proc, dict):
    for k, v in data_proc.items():
        result[f'{abbrev}_{k}'] = v
else:
    result[abbrev] = data_proc

del data_proc
gc.collect()

return result

I tried to do a couple of iterations working with data from 2csv files using 4cpus and 70 Gb of memory. The tests show that memory is not free after gc.collect() and the job is killed because 165Gb of memory is needed. But is just 2 iteration and only getting 2.csv (300 Mb) for working. I don’t know how depurate this error. Can you suggest me some steps to do it?