I have a large file as input. Depending on the information in each line, the line is assigned to
type2. Each line is independent from the others. I thought I could use Ray to do parallel processing.
I created an object of the class for each core, and then lines are processed by different objects so in the end each core has the same amount of processing.
All the workers should append the lines to either
type2, so there should be shared variables. Alternatively, they also could create their own and then later combine them, like a kind of MapReduce.
This is what I wrote:
import ray import psutil num_cpus = psutil.cpu_count(logical=False) ray.init(num_cpus=num_cpus) @ray.remote class DataSplit(object): def __init__(self): self.type1 =  self.type2 =  def split(self, data): if my_condition: self.type1.append(data) else: self.type2.append(data) return def get_splits(self): return self.type1, self.type2 data_split = [DataSplit.remote() for _ in range(num_cpus)] with open('myfile.txt') as infile: for i, line in enumerate(infile): data_split[i % num_cpus].split.remote(line) results = ray.get([actor.get_splits.remote() for actor in data_split]) type1, type2 = ,  for elem in results: type1.extend(elem) type2.extend(elem)
The code works, each node is used but it is terribly slow. I ran the same thing sequentially, just:
type1, type2 = ,  with open('myfile.txt') as infile: for line in infile: if my_condition: type1.append(line) else: type2.append(line)
and it is so much faster so I must be doing something wrong. Can anyone help me? Thanks!