Using Redpanda, OpenAI, and MongoDB to generate vector embeddings on streaming text

Streaming text embeddings for Retrieval Augmented Generation (RAG)

Redpanda Data
8 min readJun 5, 2024

Author: James Kinley

This isn’t just another post about Retrieval Augmented Generation (RAG). It focuses more on how to create text embeddings on streaming data. I’ll provide an example of how to stream documents into Redpanda (a supercharged Kafka-compatible streaming data platform) and then enrich those documents with OpenAI text embeddings using Redpanda Connect — a new product that we announced after Redpanda acquired Benthos (a simple stream processing framework).

The example then adds the enriched documents to a vector database, which is used to complete the retrieval pipeline by querying for relevant texts to add context to a Large Language Model (LLM) prompt. Before we dig in, let’s establish some context.

A quick run through RAG

OpenAI defines RAG as follows:

“RAG is the process of retrieving relevant contextual information from a data source and passing that information to a large language model alongside the user’s prompt. This information is used to improve the model’s output by augmenting the model’s base knowledge.”

RAG comprises two parts:

  1. The acquisition and persistence of new information that a LLM has no prior knowledge of. This new information (documents, webpages, etc) is split into smaller chunks of text and stored in a vector database alongside its vector embeddings. This is the part we’re going to use stream processing for.
  2. The retrieval of relevant contextual information from the vector database (semantic search), and the passing of that context alongside a user prompt to the LLM to improve the model’s generated answer.

Vector embeddings are a mathematical representation of text that encodes it’s semantic meaning in a multidimensional vector space. Words with a similar meaning are clustered closer together in this space (i.e. they have similar coordinates). Vector databases have the ability to query vector embeddings to retrieve texts with a similar semantic meaning. A famous example of this being king — man + woman = queen. — Grant Sanderson

Now that the context has been set (pun intended), let's walk through an example of how to build the stream processing pipeline. All the code included in this article can be found in this GitHub: jrkinley/benthos-embeddings.

Building the stream processing pipeline with Redpanda, OpenAI, and MongoDB

There are a few prerequisites to get this working, but all the instructions are provided in the README, and they’re easy to follow. In summary, the example uses three online services and we need authentication credentials for each of them:

  1. OpenAI API: for requesting text embeddings and model generations.
  2. Redpanda Serverless: for fast streaming of text documents.
  3. MongoDB Atlas: for vector storage and semantic search.

Clone the GitHub repository and add the prerequisite credentials to a file named .env in the demo directory as follows:

REDPANDA_SERVERS="<bootstrap_server_url>"
REDPANDA_USER="<username>"
REDPANDA_PASS="<password>"
REDPANDA_TOPICS="documents"

OPENAI_API_KEY="<secret_key>"
OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
OPENAI_MODEL="gpt-4o"

ATLAS_CONNECTION_STRING="mongodb+srv://<username>:<password>@<...>.mongodb.net/?retryWrites=false"
ATLAS_DB="VectorStore"
ATLAS_COLLECTION="Embeddings"
ATLAS_INDEX="vector_index"

Before we get started in earnest, the final step to complete the setup is to create a Python virtual environment and install the packages listed in requirements.txt:

% cd demo
% python3 -m venv env
% source env/bin/activate
(env) % pip install -r requirements.txt

1. Query ChatGPT for a baseline

What we’re trying to achieve in this example is to demonstrate how to use streaming text to improve the generated responses coming from ChatGPT. On 13 May 2024, OpenAI released its latest model gpt-4o, which at the time of writing this article has been trained on data up to October 2023.

So the model is unlikely to have any intuition of current affairs past this date, and consequently will be unlikely to generate a meaningful response to a question that references recent events. This is why RAG exists!

To begin, let’s query ChatGPT to get a baseline answer to a question about a recent event. On 21 May 2024 the England National Football Team announced its provisional men's squad for the upcoming UEFA Euro 2024 competition.

Let’s ask ChatGPT to list the squad.

(env) % python retrieve_generate.py -q """
Which football players made the provisional England national squad
for the Euro 2024 tournament, and who didn't make the cut?"""

Initial answer:
As of my last update in October 2023, the provisional England national
squad for the Euro 2024 tournament has not yet been announced. Squad
selections for major tournaments like the Euros are typically made a few
months before the event begins, often in the spring or early summer of the
tournament year.

To get the most accurate and up-to-date information regarding the
provisional squad and which players made the cut or were left out, I
recommend checking the latest news from official sources such as the
Football Association (FA) website, reputable sports news outlets, or the
official social media channels of the England national team. These sources
will provide the most current details on player selections and any updates
related to the squad.

As anticipated, ChatGPT has no prior knowledge of this information so it generates a helpful, but somewhat generic response to the question. Now, let’s build the streaming RAG pipeline to augment ChatGPT’s knowledge with the relevant context and see if it can generate an improved answer.

2. Stream text into Redpanda with LangChain

LangChain is a framework and collection of tools for building AI applications.

In this example, we use LangChain’s WebBaseLoader to load a HTML page from the BBC Sport website that captures England’s Euro 2024 squad announcement, as well as some other news about surprise omissions from the squad. LangChain parses the HTML with Python’s BeautifulSoup library to extract the text.

from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
...
loader = WebBaseLoader(url)
loader.requests_kwargs = {"verify": True}
docs = loader.load()
...
splitter = RecursiveCharacterTextSplitter(
chunk_size=2000, chunk_overlap=200)
chunks = splitter.split_documents(docs)

The text is split into smaller chunks using LangChain’s RecursiveCharacterTextSplitter to make sure it fits into ChatGPT’s context window, before the chunks are sent to Redpanda ready for adding text embeddings.

(env) % python produce_documents.py \
--url "https://www.bbc.co.uk/sport/football/articles/c3gglr8mpzdo"

Prepared 7 documents
Producing to Kafka: 100%|################| 7/7 [00:00<00:00, 17199.84it/s]
Sent 7 documents to Kafka topic: documents

3. Generate text embeddings with Redpanda Connect

With chunks of text now streaming into a Redpanda topic, the next step is to use Benthos (now known as Redpanda Connect) to consume the events from the topic and pass the text values through a stream processor that calls OpenAI’s embeddings API to retrieve the vector embeddings. The enriched events (text + embeddings + metadata) are then inserted into the MongoDB Atlas database collection where they are indexed and made available for semantic search.

% export $(grep -v '^#' ./demo/.env | xargs)
% go test

PASS
ok benthos-embeddings 2.837s

% go build
% ./benthos-embeddings -c demo/rag_demo.yaml --log.level debug

INFO Running main config from specified file @service=benthos benthos_version=v4.27.0 path=demo/rag_demo.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos
DEBU url: https://api.openai.com/v1/embeddings, model: text-embedding-3-small @service=benthos label="" path=root.pipeline.processors.0
INFO Launching a benthos instance, use CTRL+C to close @service=benthos
INFO Input type kafka is now active @service=benthos label="" path=root.input
DEBU Starting consumer group @service=benthos label="" path=root.input
INFO Output type mongodb is now active @service=benthos label="" path=root.output
DEBU Consuming messages from topic 'documents' partition '0' @service=benthos label="" path=root.input

Note that the Benthos configuration rag_demo.yaml uses variable interpolation, so it’s important not to skip over the export command that sets the environment variables listed in demo/.env.

This completes the first part of the RAG pipeline. Text is now streaming into Redpanda, being enriched with text embeddings from OpenAI, and being inserted into the vector store, where the database contains documents that look like this:

_id: 6656175c20b49dd898e29105
embedding: Array (1536)
metadata: Object
description: "Marcus Rashford and Jordan Henderson are left out of Gareth Southgate'…"
document_id: "TtX1mLEFGYEVXYG2Ue0yfJC3OeKTmrsNbt/F5eIhGlE="
language: "en-GB"
source: "https://www.bbc.co.uk/sport/football/articles/c3gglr8mpzdo"
title: "England Euro 2024 squad: Marcus Rashford and Jordan Henderson left out…"
text: "England Euro 2024 squad: Marcus Rashford and Jordan Henderson left out…"

The embedding field is an array of 1,536 doubles, which is the length of the embedding vector for OpenAI’s text-embedding-3-small model.

4. Augmented model generation

The final piece of the puzzle that brings this all together is to resend the original question to ChatGPT, but this time add some relevant context to the prompt by retrieving similar texts from the vector store. Again, the example uses LangChain to help us build the retrieval chain:

from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains.chat_vector_db import prompts

vector_search = MongoDBAtlasVectorSearch.from_connection_string(
os.getenv("ATLAS_CONNECTION_STRING"),
f"{os.getenv("ATLAS_DB")}.{os.getenv("ATLAS_COLLECTION")}",
OpenAIEmbeddings(model=os.getenv("OPENAI_EMBEDDING_MODEL")),
index_name=os.getenv("ATLAS_INDEX")
)

retriever = vector_search.as_retriever(
search_type="similarity",
search_kwargs={"score_threshold": 0.75}
)

llm = ChatOpenAI(model=os.getenv("OPENAI_MODEL"))

def format_docs(docs):
return "\n".join(doc.page_content for doc in docs)

# Build the RAG chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompts.QA_PROMPT
| llm
| StrOutputParser()
)
answer = rag_chain.invoke(query)

Do we get a better answer?

Yes! ChatGPT uses the context provided in the prompt to generate a response that lists the England football players that made the squad, and the notable players that didn’t make the cut:

(env) % python retrieve_generate.py -q """
Which football players made the provisional England national squad
for the Euro 2024 tournament, and who didn't make the cut?"""

Augmented answer:
The football players who made the provisional England national squad for
the Euro 2024 tournament are:

**Goalkeepers:**
- Dean Henderson (Crystal Palace)
- Jordan Pickford (Everton)
- Aaron Ramsdale (Arsenal)
- James Trafford (Burnley)

**Defenders:**
- Jarrad Branthwaite (Everton)
- Lewis Dunk (Brighton)
- Joe Gomez (Liverpool)
- Marc Guehi (Crystal Palace)
- Ezri Konsa (Aston Villa)
- Harry Maguire (Manchester United)
- Jarell Quansah (Liverpool)
- Luke Shaw (Manchester United)
- John Stones (Manchester City)
- Kieran Trippier (Newcastle)
- Kyle Walker (Manchester City)

**Midfielders:**
- Trent Alexander-Arnold (Liverpool)
- Conor Gallagher (Chelsea)
- Curtis Jones (Liverpool)
- Kobbie Mainoo (Manchester United)
- Declan Rice (Arsenal)
- Adam Wharton (Crystal Palace)

**Forwards:**
- Jude Bellingham (Real Madrid)
- Jarrod Bowen (West Ham)
- Eberechi Eze (Crystal Palace)
- Phil Foden (Manchester City)
- Jack Grealish (Manchester City)
- Anthony Gordon (Newcastle)
- Harry Kane (Bayern Munich)
- James Maddison (Tottenham)
- Cole Palmer (Chelsea)
- Bukayo Saka (Arsenal)
- Ivan Toney (Brentford)
- Ollie Watkins (Aston Villa)

**Notable players who didn't make the cut:**
- Marcus Rashford
- Jordan Henderson
- Ben Chilwell
- Eric Dier
- Reece James
- Jadon Sancho
- Dominic Solanke
- Raheem Sterling
- Ben White

Conclusion

If you take one thing away from this article, it is the idea that a RAG pipeline can, and arguably should, include a streaming data component. Information doesn’t stand still and the faster you can collect, prepare, and make new information available for prompt engineering, the more accurate and relevant generations your chosen LLM will make.

The example presented here was effective but simple. We could have easily built the pipeline using just the LangChain and MongoDB components, and the result would have been the same, but that wasn’t the idea. News sites, social media applications, and other event-based systems can generate information at an incredible rate, and your RAG pipeline will reach an inflection point where you need to decouple the components to handle the sheer volume of data being generated.

Slotting a streaming data platform into your RAG architecture decouples the data collection, processing, and storage components. Tools like Redpanda provide a simple, fast, and durable event buffer that can underpin your end-to-end AI applications, future-proofing them as they scale.

To learn more about Redpanda Connect, check out this page.

Originally published on Medium by James Kinsley

--

--

Redpanda Data

The streaming data platform for developers—fully Kafka compatible. Helping developers build the best data streaming experiences.