Turbocharge LangChain: guide to 20x faster embedding

Hi,

I recently read the blog,
Turbocharge LangChain: guide to 20x faster embedding | Anyscale which shows how to improve vector creation in faster way using ray. However, in the blog they have used cluster. Since, I have don’t have access to cluster I tried using multi-gpu (4 GPU)instance from SageMaker Studio.

I noticed vectordatabase creation time reduced from 2hrs to 2min but I noticed it uses only CPU not the GPU. Could you please help me here ?
Note: I have 20 pdf file with 50+ pages each.

Also, as I’m SageMaker studio to run the code from the blog, I’m able to open the Ray visualisation board to view the utilisation. could you please also tell me how to view the utilisation?

Thanks

Hey @IamExperimentingNow,

Could you share the following info?

  1. The code that you are using
  2. Which graph in particular are you looking at to determine that GPU is not being used?

I’m using the same code which you have used in the tutorial, and I copied the same code from the blog. Since, I don’t have a cluster, I tried with multi-gpu instance from SageMaker.

And what are you looking at to see that GPU is not being used? This is through nvidia-smi?

The code that’s in the tutorial is set for 20 GPUs. Are you decreasing this number?

Yes, I used nividia-smi to see gpu utilization and I have reduced the numbers to 4 since I have 4 gpu in my instance.

Would you be able to share your modified code?

please find the modified code below

import ray, io
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
from ray.data.datasource import FileExtensionFilter
from langchain.text_splitter import RecursiveCharacterTextSplitter

ray.init(runtime_env={"pip": ["langchain", "pypdf", "sentence_transformers", "transformers","pycryptodome","PyPDF2"]})

# Filter out non-PDF files.
ds = ray.data.read_binary_files("s3://pdf-files-folder/", partition_filter=FileExtensionFilter("pdf"))

def convert_to_text(pdf_bytes: bytes):
    pdf_bytes_io = io.BytesIO(pdf_bytes)

    try:
        pdf_doc = PdfReader(pdf_bytes_io)
    except pypdf.errors.PdfStreamError:
        # Skip pdfs that are not readable.
        # We still have over 30,000 pages after skipping these.
        return []

    text = []
    for page in pdf_doc.pages:
        try:
            text.append(page.extract_text())
        except binascii.Error:
            # Skip all pages that are not parseable due to malformed characters.
            print("parsing failed")
    return text

# We use `flat_map` as `convert_to_text` has a 1->N relationship.
# It produces N strings for each PDF (one string per page).
# Use `map` for 1->1 relationship.
ds = ds.flat_map(convert_to_text)

def split_text(page_text: str):
    # Use chunk_size of 1000.
    # We felt that the answer we would be looking for would be 
    # around 200 words, or around 1000 characters.
    # This parameter can be modified based on your documents and use case.
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100, length_function=len
    )
    split_text = text_splitter.split_text(page_text)

    split_text = [text.replace("\n", " ") for text in split_text]
    return split_text

# We use `flat_map` as `split_text` has a 1->N relationship.
# It produces N output chunks for each input string.
# Use `map` for 1->1 relationship.
ds = ds.flat_map(split_text)

model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {"device": "cuda"}
class Embed:
    def __init__(self):
        # Specify "cuda" to move the model to GPU.
        self.transformer = SentenceTransformer(model_name, device="cuda")

    def __call__(self, text_batch: List[str]):
        # We manually encode using sentence_transformer since LangChain
        # HuggingfaceEmbeddings does not support specifying a batch size yet.
        embeddings = self.transformer.encode(
            text_batch,
            batch_size=100,  # Large batch size to maximize GPU utilization.
            device="cuda",
        ).tolist()

        return list(zip(text_batch, embeddings))

# Use `map_batches` since we want to specify a batch size to maximize GPU utilization.
ds = ds.map_batches(
    Embed,
    # Large batch size to maximize GPU utilization.
    # Too large a batch size may result in GPU running out of memory.
    # If the chunk size is increased, then decrease batch size.
    # If the chunk size is decreased, then increase batch size.
    batch_size=100,  # Large batch size to maximize GPU utilization.
    compute=ray.data.ActorPoolStrategy(min_size=1, max_size=4),  # I have 20 GPUs in my cluster
    num_gpus=1,  # 1 GPU for each actor.
)

from langchain import FAISS
from langchain.embeddings import HuggingFaceEmbeddings

text_and_embeddings = []
for output in ds.iter_rows():
    text_and_embeddings.append(output)

vectore_store = FAISS.from_embeddings(
    text_and_embeddings,
    # Provide the embedding model to embed the query.
    # The documents are already embedded.
    embedding=HuggingFaceEmbeddings(model_name=model_name)
)

# Persist the vector store.
vectore_store.save_local("faiss_index")

Thanks @IamExperimentingNow!

Are you sure the actual computation is being run in the code you sent? There were some errors when I tried it.

I made the following changes to your code:

  1. Fixing some import errors
  2. Setting both min_size=4 and max_size=4

Once I do that, I’m seeing full GPU utilization across all my GPUs:

The code I modified from what you sent me is here:

import binascii
import ray, io
import PyPDF2
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
from ray.data.datasource import FileExtensionFilter
from langchain.text_splitter import RecursiveCharacterTextSplitter

from typing import List


ray.init(runtime_env={"pip": ["langchain", "sentence_transformers", "transformers","pycryptodome","PyPDF2"]})

# Filter out non-PDF files.
ds = ray.data.read_binary_files("s3://ray-llm-batch-inference/", partition_filter=FileExtensionFilter("pdf"))

def convert_to_text(pdf_bytes: bytes):
    pdf_bytes_io = io.BytesIO(pdf_bytes)

    try:
        pdf_doc = PdfReader(pdf_bytes_io)
    except PyPDF2.errors.PdfReadError:
        # Skip pdfs that are not readable.
        # We still have over 30,000 pages after skipping these.
        return []

    text = []
    for page in pdf_doc.pages:
        try:
            text.append(page.extract_text())
        except binascii.Error:
            # Skip all pages that are not parseable due to malformed characters.
            print("parsing failed")
    return text

# We use `flat_map` as `convert_to_text` has a 1->N relationship.
# It produces N strings for each PDF (one string per page).
# Use `map` for 1->1 relationship.
ds = ds.flat_map(convert_to_text)

def split_text(page_text: str):
    # Use chunk_size of 1000.
    # We felt that the answer we would be looking for would be 
    # around 200 words, or around 1000 characters.
    # This parameter can be modified based on your documents and use case.
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100, length_function=len
    )
    split_text = text_splitter.split_text(page_text)

    split_text = [text.replace("\n", " ") for text in split_text]
    return split_text

# We use `flat_map` as `split_text` has a 1->N relationship.
# It produces N output chunks for each input string.
# Use `map` for 1->1 relationship.
ds = ds.flat_map(split_text)

model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {"device": "cuda"}
class Embed:
    def __init__(self):
        # Specify "cuda" to move the model to GPU.
        self.transformer = SentenceTransformer(model_name, device="cuda")

    def __call__(self, text_batch: List[str]):
        # We manually encode using sentence_transformer since LangChain
        # HuggingfaceEmbeddings does not support specifying a batch size yet.
        embeddings = self.transformer.encode(
            text_batch,
            batch_size=100,  # Large batch size to maximize GPU utilization.
            device="cuda",
        ).tolist()

        return list(zip(text_batch, embeddings))

# Use `map_batches` since we want to specify a batch size to maximize GPU utilization.
ds = ds.map_batches(
    Embed,
    # Large batch size to maximize GPU utilization.
    # Too large a batch size may result in GPU running out of memory.
    # If the chunk size is increased, then decrease batch size.
    # If the chunk size is decreased, then increase batch size.
    batch_size=100,  # Large batch size to maximize GPU utilization.
    compute=ray.data.ActorPoolStrategy(min_size=4, max_size=4),  # I have 4 GPUs in my cluster
    num_gpus=1,  # 1 GPU for each actor.
)

from langchain import FAISS
from langchain.embeddings import HuggingFaceEmbeddings

text_and_embeddings = []
for output in ds.iter_rows():
    text_and_embeddings.append(output)

vectore_store = FAISS.from_embeddings(
    text_and_embeddings,
    # Provide the embedding model to embed the query.
    # The documents are already embedded.
    embedding=HuggingFaceEmbeddings(model_name=model_name)
)

# Persist the vector store.
vectore_store.save_local("faiss_index")
1 Like

thanks @amogkam , yes, I just hand typed the code, I might have missed any. Thanks though, it worked for me :slight_smile: However, is it possible to display the utilisation like how to visually noticed the CPU and GPU usage in SageMaker studio?

I’m not too familiar with Sagemaker Studio, but if you have the Ray dashboard up and running you can view the GPU utilization from the cluster tab.

https://docs.ray.io/en/latest/ray-core/ray-dashboard.html#dash-workflow-resource-utilization

I’m not able to open ray dashboard from SageMaker Studio. But, thanks though, If you find any work around please pass me.

Hey @amogkam , I just noticed while experimenting your blog code, chunk size is not the size what we mention in

text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100, length_function=len
    )

I did a verification after the chunk creation from all the pdf files. chunk size of the each chunk varies. In the above code I mentioned chunk_size=1000 with chunk_overlap=100. However, after the chunk creation when I iterated the each chunk to check its length, it turn out 546, 92, 253, 756, and so on.

But, I did an another experiment by traditional langchain method using CharacterTextSplitter it turn close values to 1000 like 9623, 9462, 9745, and so on. which is better than actually.

could you please check once and confirm, am I making any mistakes?

@amogkam did you get a chance to look into this issue? that after text split chunk is not what we mention.

I think this is just the nature of how RecursiveCharacterTextSplitter works compared to CharacterTextSplitter. What are you seeing when you use RecursiveCharacterTextSplitter with LangChain directly?

@IamExperimentingNow can you give us a high level overview on how exactly you used sagemaker for creating embeddings?