I have successfully implemented key phrase extraction code using two approaches:
- Reading data in python data structures and processing it using Ray Actor Pool.
- Reading data with Ray Dataset and processing it.
How to decide between processing data with python lists (decorated with ray actors) vs Ray Dataset considering performance and other aspects?
I am working on keyphrase extraction which uses spacy model.
My data lies in S3 and I have the option to keep my data in s3 either in multiple json files or parquet. In this experiment, for ray actor implementation, i read json file and for Ray dataset implementation, i read data from parquet files. Dataset provides api to read files directly from s3 which makes it easier. If I go ahead with Ray Actor Pool I would have to write another helper function for reading files from S3.
I was hoping to learn about general guidelines while developing ray applications (in this case using ray dataset vs python data structures(decorated with ray actors) for accomplishing my task.
Sharing snippets of code below.
Using Ray Actor Pool:
@ray.remote
class Textprocessor:
def __init__(self):
# setup instructions for spacy model
import pytextrank
self.nlp = spacy.load("en_core_web_lg")
self.nlp.max_length = 1080000
self.nlp.add_pipe("textrank")
def process(self, args):
## some processing ###
return
def process_func2(self, args):
## some processing ###
return
def mainprocess(self, doc):
## some processing ###
return keyphrases
documents = read_json(path) #read json is a utility function and documents is list of dict.
actors = []
for actor in range(int(ray.cluster_resources()['CPU'])):
actors.append(Textprocessor.remote())
pool = ActorPool(actors)
for output in pool.map_unordered(lambda a, v: a.mainprocess.remote(v), documents):
## processing
Using Ray Datasets:
ds = ray.data.read_parquet("S3//PATH")
class TransformBatch:
def __init__(self):
import pytextrank
self.nlp = spacy.load("en_core_web_lg")
self.nlp.max_length = 1080000
self.nlp.add_pipe("textrank")
def process(self, args):
## some processing ###
return
def process_func2(self, args):
## some processing ###
return
def __call__(self, batch):
import copy
batch = copy.deepcopy(batch)
batch['lower_text'] = batch['text'].map(str.lower)
batch['spacy_docs'] = batch['lower_text'].map(self.nlp)
batch['doc_phrases'] = batch['spacy_docs'].map(self.process)
batch['key_phrases'] = batch['doc_phrases'].map(self.process_func2)
return pd.DataFrame(batch['key_phrases']) #only need key phrases
ds = ds_docs.map_batches(TransformBatch, compute=ActorPoolStrategy(6,8), batch_format='pandas')