I have a large file as input. Depending on the information in each line, the line is assigned to type1
or type2
. Each line is independent from the others. I thought I could use Ray to do parallel processing.
I created an object of the class for each core, and then lines are processed by different objects so in the end each core has the same amount of processing.
All the workers should append the lines to either type1
or type2
, so there should be shared variables. Alternatively, they also could create their own and then later combine them, like a kind of MapReduce.
This is what I wrote:
import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)
@ray.remote
class DataSplit(object):
def __init__(self):
self.type1 = []
self.type2 = []
def split(self, data):
if my_condition:
self.type1.append(data)
else:
self.type2.append(data)
return
def get_splits(self):
return self.type1, self.type2
data_split = [DataSplit.remote() for _ in range(num_cpus)]
with open('myfile.txt') as infile:
for i, line in enumerate(infile):
data_split[i % num_cpus].split.remote(line)
results = ray.get([actor.get_splits.remote() for actor in data_split])
type1, type2 = [], []
for elem in results:
type1.extend(elem[0])
type2.extend(elem[1])
The code works, each node is used but it is terribly slow. I ran the same thing sequentially, just:
type1, type2 = [], []
with open('myfile.txt') as infile:
for line in infile:
if my_condition:
type1.append(line)
else:
type2.append(line)
and it is so much faster so I must be doing something wrong. Can anyone help me? Thanks!