I recently read the blog, Turbocharge LangChain: Guide to 20x Faster Embedding which shows how to improve vector creation in faster way using ray. However, in the blog they have used cluster. Since, I have don’t have access to cluster I tried using multi-gpu (4 GPU)instance from SageMaker Studio.
I noticed vectordatabase creation time reduced from 2hrs to 2min but I noticed it uses only CPU not the GPU. Could you please help me here ?
Note: I have 20 pdf file with 50+ pages each.
Also, as I’m SageMaker studio to run the code from the blog, I’m able to open the Ray visualisation board to view the utilisation. could you please also tell me how to view the utilisation?
I’m using the same code which you have used in the tutorial, and I copied the same code from the blog. Since, I don’t have a cluster, I tried with multi-gpu instance from SageMaker.
import ray, io
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
from ray.data.datasource import FileExtensionFilter
from langchain.text_splitter import RecursiveCharacterTextSplitter
ray.init(runtime_env={"pip": ["langchain", "pypdf", "sentence_transformers", "transformers","pycryptodome","PyPDF2"]})
# Filter out non-PDF files.
ds = ray.data.read_binary_files("s3://pdf-files-folder/", partition_filter=FileExtensionFilter("pdf"))
def convert_to_text(pdf_bytes: bytes):
pdf_bytes_io = io.BytesIO(pdf_bytes)
try:
pdf_doc = PdfReader(pdf_bytes_io)
except pypdf.errors.PdfStreamError:
# Skip pdfs that are not readable.
# We still have over 30,000 pages after skipping these.
return []
text = []
for page in pdf_doc.pages:
try:
text.append(page.extract_text())
except binascii.Error:
# Skip all pages that are not parseable due to malformed characters.
print("parsing failed")
return text
# We use `flat_map` as `convert_to_text` has a 1->N relationship.
# It produces N strings for each PDF (one string per page).
# Use `map` for 1->1 relationship.
ds = ds.flat_map(convert_to_text)
def split_text(page_text: str):
# Use chunk_size of 1000.
# We felt that the answer we would be looking for would be
# around 200 words, or around 1000 characters.
# This parameter can be modified based on your documents and use case.
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=100, length_function=len
)
split_text = text_splitter.split_text(page_text)
split_text = [text.replace("\n", " ") for text in split_text]
return split_text
# We use `flat_map` as `split_text` has a 1->N relationship.
# It produces N output chunks for each input string.
# Use `map` for 1->1 relationship.
ds = ds.flat_map(split_text)
model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {"device": "cuda"}
class Embed:
def __init__(self):
# Specify "cuda" to move the model to GPU.
self.transformer = SentenceTransformer(model_name, device="cuda")
def __call__(self, text_batch: List[str]):
# We manually encode using sentence_transformer since LangChain
# HuggingfaceEmbeddings does not support specifying a batch size yet.
embeddings = self.transformer.encode(
text_batch,
batch_size=100, # Large batch size to maximize GPU utilization.
device="cuda",
).tolist()
return list(zip(text_batch, embeddings))
# Use `map_batches` since we want to specify a batch size to maximize GPU utilization.
ds = ds.map_batches(
Embed,
# Large batch size to maximize GPU utilization.
# Too large a batch size may result in GPU running out of memory.
# If the chunk size is increased, then decrease batch size.
# If the chunk size is decreased, then increase batch size.
batch_size=100, # Large batch size to maximize GPU utilization.
compute=ray.data.ActorPoolStrategy(min_size=1, max_size=4), # I have 20 GPUs in my cluster
num_gpus=1, # 1 GPU for each actor.
)
from langchain import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
text_and_embeddings = []
for output in ds.iter_rows():
text_and_embeddings.append(output)
vectore_store = FAISS.from_embeddings(
text_and_embeddings,
# Provide the embedding model to embed the query.
# The documents are already embedded.
embedding=HuggingFaceEmbeddings(model_name=model_name)
)
# Persist the vector store.
vectore_store.save_local("faiss_index")
The code I modified from what you sent me is here:
import binascii
import ray, io
import PyPDF2
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
from ray.data.datasource import FileExtensionFilter
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
ray.init(runtime_env={"pip": ["langchain", "sentence_transformers", "transformers","pycryptodome","PyPDF2"]})
# Filter out non-PDF files.
ds = ray.data.read_binary_files("s3://ray-llm-batch-inference/", partition_filter=FileExtensionFilter("pdf"))
def convert_to_text(pdf_bytes: bytes):
pdf_bytes_io = io.BytesIO(pdf_bytes)
try:
pdf_doc = PdfReader(pdf_bytes_io)
except PyPDF2.errors.PdfReadError:
# Skip pdfs that are not readable.
# We still have over 30,000 pages after skipping these.
return []
text = []
for page in pdf_doc.pages:
try:
text.append(page.extract_text())
except binascii.Error:
# Skip all pages that are not parseable due to malformed characters.
print("parsing failed")
return text
# We use `flat_map` as `convert_to_text` has a 1->N relationship.
# It produces N strings for each PDF (one string per page).
# Use `map` for 1->1 relationship.
ds = ds.flat_map(convert_to_text)
def split_text(page_text: str):
# Use chunk_size of 1000.
# We felt that the answer we would be looking for would be
# around 200 words, or around 1000 characters.
# This parameter can be modified based on your documents and use case.
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=100, length_function=len
)
split_text = text_splitter.split_text(page_text)
split_text = [text.replace("\n", " ") for text in split_text]
return split_text
# We use `flat_map` as `split_text` has a 1->N relationship.
# It produces N output chunks for each input string.
# Use `map` for 1->1 relationship.
ds = ds.flat_map(split_text)
model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {"device": "cuda"}
class Embed:
def __init__(self):
# Specify "cuda" to move the model to GPU.
self.transformer = SentenceTransformer(model_name, device="cuda")
def __call__(self, text_batch: List[str]):
# We manually encode using sentence_transformer since LangChain
# HuggingfaceEmbeddings does not support specifying a batch size yet.
embeddings = self.transformer.encode(
text_batch,
batch_size=100, # Large batch size to maximize GPU utilization.
device="cuda",
).tolist()
return list(zip(text_batch, embeddings))
# Use `map_batches` since we want to specify a batch size to maximize GPU utilization.
ds = ds.map_batches(
Embed,
# Large batch size to maximize GPU utilization.
# Too large a batch size may result in GPU running out of memory.
# If the chunk size is increased, then decrease batch size.
# If the chunk size is decreased, then increase batch size.
batch_size=100, # Large batch size to maximize GPU utilization.
compute=ray.data.ActorPoolStrategy(min_size=4, max_size=4), # I have 4 GPUs in my cluster
num_gpus=1, # 1 GPU for each actor.
)
from langchain import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
text_and_embeddings = []
for output in ds.iter_rows():
text_and_embeddings.append(output)
vectore_store = FAISS.from_embeddings(
text_and_embeddings,
# Provide the embedding model to embed the query.
# The documents are already embedded.
embedding=HuggingFaceEmbeddings(model_name=model_name)
)
# Persist the vector store.
vectore_store.save_local("faiss_index")
thanks @amogkam , yes, I just hand typed the code, I might have missed any. Thanks though, it worked for me However, is it possible to display the utilisation like how to visually noticed the CPU and GPU usage in SageMaker studio?
I did a verification after the chunk creation from all the pdf files. chunk size of the each chunk varies. In the above code I mentioned chunk_size=1000 with chunk_overlap=100. However, after the chunk creation when I iterated the each chunk to check its length, it turn out 546, 92, 253, 756, and so on.
But, I did an another experiment by traditional langchain method using CharacterTextSplitter it turn close values to 1000 like 9623, 9462, 9745, and so on. which is better than actually.
could you please check once and confirm, am I making any mistakes?
I think this is just the nature of how RecursiveCharacterTextSplitter works compared to CharacterTextSplitter. What are you seeing when you use RecursiveCharacterTextSplitter with LangChain directly?
Hello, I am using your code for experimenting and I keep running into the following error
Standalone Python objects are not allowed in Ray 2.5. To return Python objects from map(), wrap them in a dict, e.g., return {'item': item} instead of just item.