Using OpenAI and MongoDB to generate vector embeddings on streaming text

Streaming text embeddings for Retrieval Augmented Generation (RAG) with Redpanda

Redpanda Data
7 min readJun 5, 2024

Author: James Kinley

This isn’t just another post about Retrieval Augmented Generation (RAG). It focuses more on how to create text embeddings on streaming data. I’ll provide an example of how to stream documents into Redpanda (a supercharged Kafka-compatible streaming data platform) and then enrich those documents with OpenAI text embeddings using Redpanda Connect — a new product that we announced after Redpanda acquired Benthos (a simple stream processing framework).

The example then adds the enriched documents to a vector database, which is used to complete the retrieval pipeline by querying for relevant texts to add context to a Large Language Model (LLM) prompt. Before we dig in, let’s establish some context.

A quick run through RAG

OpenAI defines RAG as follows:

“RAG is the process of retrieving relevant contextual information from a data source and passing that information to a large language model alongside the user’s prompt. This information is used to improve the model’s output by augmenting the model’s base knowledge.”

RAG comprises two parts:

  1. The acquisition and persistence of new information that a LLM has no prior knowledge of. This new information (documents, webpages, etc) is split into smaller chunks of text and stored in a vector database alongside its vector embeddings. This is the part we’re going to use stream processing for.
  2. The retrieval of relevant contextual information from the vector database (semantic search), and the passing of that context alongside a user prompt to the LLM to improve the model’s generated answer.

Vector embeddings are a mathematical representation of text that encodes it’s semantic meaning in a multidimensional vector space. Words with a similar meaning are clustered closer together in this space (i.e. they have similar coordinates). Vector databases have the ability to query vector embeddings to retrieve texts with a similar semantic meaning. A famous example of this being king — man + woman = queen. — Grant Sanderson

Now that the context has been set (pun intended), let's walk through an example of how to build the stream processing pipeline. All the code included in this article can be found in this GitHub: jrkinley/benthos-embeddings.

Building the stream processing pipeline with Redpanda, OpenAI, and MongoDB

There are a few prerequisites to get this working, but all the instructions are provided in the README, and they’re easy to follow. In summary, the example uses three online services and we need authentication credentials for each of them:

  1. OpenAI API: for requesting text embeddings and model generations.
  2. Redpanda Serverless: for fast streaming of text documents.
  3. MongoDB Atlas: for vector storage and semantic search.

Clone the GitHub repository and add the prerequisite credentials to a file named .env in the demo directory as follows:

REDPANDA_SERVERS="<bootstrap_server_url>"
REDPANDA_USER="<username>"
REDPANDA_PASS="<password>"
REDPANDA_TOPICS="documents"

OPENAI_API_KEY="<secret_key>"
OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
OPENAI_MODEL="gpt-4o"

ATLAS_CONNECTION_STRING="mongodb+srv://<username>:<password>@<...>.mongodb.net/?retryWrites=false"
ATLAS_DB="VectorStore"
ATLAS_COLLECTION="Embeddings"
ATLAS_INDEX="vector_index"

Before we get started in earnest, the final step to complete the setup is to create a Python virtual environment and install the packages listed in requirements.txt:

% cd demo
% python3 -m venv env
% source env/bin/activate
(env) % pip install -r requirements.txt

1. Query ChatGPT for a baseline

The goal here is to demonstrate how to use streaming text to improve the generated responses coming from ChatGPT. On 13 May 2024, OpenAI released its latest model gpt-4o, which at the time of writing this post, has been trained on data up to October 2023.

So, the model is unlikely to acknowledge current affairs past this date and consequently will not generate a meaningful response to a question that references recent events, like the Paris Olympic Games. This is why RAG exists!

To begin, let’s ask ChatGPT to answer a question about something that happened after its training cutoff date (i.e., something it hasn’t learned about): “Name the athletes and their nations that won Gold, Silver, and Bronze in the men’s 100m at the Paris Olympic games.”

# Activate the Python environment
source env/bin/activate

# Prompt OpenAI without any context
python openai_rag.py -v \
-q "Name the athletes and their nations that won Gold, Silver, and Bronze in the mens 100m at the Paris olympic games."

# gpt-4o doesn't know the answer
Question: Name the atheletes and their nations that won Gold, Silver, and
Bronze in the mens 100m at the Paris olympic games.

Answer: I don't know.

As anticipated, gpt-4o has no prior knowledge of the Paris Olympic Games and cannot answer the question. Now, let’s build the streaming RAG pipeline to provide the model with some sudo real-time intuition and see if it can generate a good answer.

2. Stream text into Redpanda with LangChain

LangChain is a framework and collection of tools for building AI applications. In this example, we use LangChain’s WebBaseLoader to load an HTML page from the BBC Sport website [4] that reports on the men’s 100m sprint final.

The text is split into smaller chunks using LangChain’s RecursiveCharacterTextSplitter to make sure it fits into ChatGPT’s context window, before the splits are sent to Redpanda ready for adding text embeddings.

# Run the LangChain application
python produce_documents.py \
-u "https://www.bbc.co.uk/sport/olympics/articles/clwyy8jwp2go"

Prepared 6 documents
Producing to Kafka: 100%|################| 6/6 [00:00<00:00, 12633.45it/s]
Sent 6 documents to Kafka topic: documents

3. Generate text embeddings with Redpanda Connect

With the text splits now streaming into a Redpanda topic, the next step is to use Redpanda Connect to consume the events from the documents topic and pass the text values through a processor that calls OpenAI’s embeddings API to retrieve the vector embeddings. The enriched events (text + embeddings + metadata) are then inserted into a MongoDB Atlas database collection, which is indexed and made available for semantic search.

# Set the environment variables
export $(grep -v '^#' .env | xargs)

# Start the Redpanda Connect pipeline
rpk connect run --log.level debug openai_rpcn.yaml

INFO Running main config from specified file @service=redpanda-connect benthos_version=4.34.0 path=openai_rpcn.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO Input type kafka is now active @service=redpanda-connect label="" path=root.input
DEBU Starting consumer group @service=redpanda-connect label="" path=root.input
INFO Output type mongodb is now active @service=redpanda-connect label="" path=root.output
DEBU Consuming messages from topic 'documents' partition '0' @service=redpanda-connect label="" path=root.input

Note that the Redpanda Connect configuration openai_rpcn.yaml uses variable interpolation, so it’s important not to skip over the export command that sets the environment variables listed in .env.

This completes the first part of the RAG pipeline. Text is now streaming into Redpanda, being enriched with text embeddings from OpenAI, and inserted into the vector store, where the database contains documents that look like this:

_id: 6700126c28b54c3c6e0c30d4
embedding: Array (1536)
metadata: Object
description: "Noah Lyles won Olympic men's 100m gold by five-thousandths of a second…"
language: "en-GB"
source: "https://www.bbc.co.uk/sport/olympics/articles/clwyy8jwp2go"
title: "Olympics men's 100m final: How Noah Lyles won the greatest race in his…"
text: "Olympics men's 100m final: How Noah Lyles won the greatest race in his…"

The embedding field is an array of 1,536 doubles, which is the length of the embedding vector for OpenAI’s text-embedding-3-small model.

4. Augmented model generation

The final piece of the puzzle that brings this altogether is to resend the original question to ChatGPT, but this time add some relevant context to the prompt by retrieving similar texts from the vector store. Again, the example uses LangChain to help us build the retrieval chain.

Do we get a better answer?

# Prompt OpenAI with additional context from the vector store
python openai_rag.py -v \
-q "Name the athletes and their nations that won Gold, Silver, and Bronze in the mens 100m at the Paris olympic games."

Question: Name the athletes and their nations that won Gold, Silver, and
Bronze in the mens 100m at the Paris olympic games.

Answer: Gold was won by Noah Lyles from the USA, Silver by Kishane Thompson
from Jamaica, and Bronze by Fred Kerley, also from the USA.Do we get a better answer?

Yes!

Conclusion

If you take one thing away from this post, it is the idea that a RAG pipeline can, and arguably should, include a streaming data component. Information doesn’t stand still — and the faster you can collect, prepare, and make new information available for prompt engineering, the more accurate and relevant generations your chosen LLM will make.

The example presented here was effective but simple. We could have easily built the pipeline using just the LangChain and MongoDB components, and the result would have been the same, but that wasn’t the idea. News sites, social media applications, and other event-based systems can generate information at an incredible rate, and your RAG architecture will reach an inflection point where you need to decouple and scale the components to handle the sheer volume of data being generated.

Slotting a streaming data platform into your RAG architecture decouples the data collection, processing, and storage components. Tools like Redpanda provide a simple, fast, and durable event buffer that can underpin your end-to-end AI applications, future-proofing them as they scale.

To learn more about what you can do with Redpanda Connect, check out our blog post on 10 use cases made easy with Redpanda Connect.

Originally published on Medium by James Kinsley

--

--

Redpanda Data
Redpanda Data

Written by Redpanda Data

The streaming data platform for developers—fully Kafka compatible. Helping developers build the best data streaming experiences.

No responses yet