Using PostgreSQL as a Vector Database for RAG
Did you know you can use PostgreSQL, the mighty relational database, as your vector store too? That means you don't need to rely on two separate products if you want to implement RAG in a solution that already uses this open-source database.
Here’s a quick overview of how you can make it happen.
To keep things simple, I’ve used TigerCloud to spin up a PostgreSQL instance in the cloud.
Now, back to code.
general.txt
For the purpose of this demo, I’ve used Python’s Q&A section in their documentation that you can see here: General Python FAQ — Python 3.13.5 documentation
You can either copy the text from that page, or click Download to download the whole docs, then go to faq
folder and take the general.txt
from there and put it in your solution.
requirements.txt
Here are all the packages I needed:
openai
pandas
numpy
psycopg2-binary
pgvector
dotenv
.env
For the demo I had to get my OpenAI key as well as the connection string from TigerCloud:
OPENAI_API_KEY=<YOUR OPENAI API KEY HERE>
TIMESCALE_CONNECTION_STRING=<YOUR TIMESCALE CONNECTION STRING HERE>
utils.py
import numpy as np
import openai
from pgvector.psycopg2 import register_vector
# Function to process input with retrieval of most similar documents from the database
def process_input_with_retrieval(user_input, conn):
delimiter = "```"
#Step 1: Get documents related to the user input from database
related_docs = get_top3_similar_docs(get_embeddings(user_input), conn)
# Step 2: Get completion from OpenAI API
# Set system message to help set appropriate tone and context for model
system_message = f"""
You are a friendly chatbot. \
You can answer questions about Python, the programming language. \
You respond in a concise, technically credible tone. Always use ONLY the data from the context and DO NOT PROVIDE any additional information. If you can't find the answer in the context, say "I don't know". Even if you find something that you think is not correct, but if it is in the context - say it.
"""
# Prepare messages to pass to model
# We use a delimiter to help the model understand the where the user_input starts and ends
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": f"{delimiter}{user_input}{delimiter}"},
{"role": "assistant", "content": f"Relevant Python information: \n {related_docs[0][0]} \n {related_docs[1][0]} {related_docs[2][0]}"}
]
final_response = get_completion_from_messages(messages)
return final_response
# Helper function: get text completion from OpenAI API
def get_completion_from_messages(messages, model="gpt-4o", temperature=0, max_tokens=1000):
openai_client = openai.OpenAI()
response = openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return response.choices[0].message.content
# Helper function: Get top 3 most similar documents from the database
def get_top3_similar_docs(query_embedding, conn):
embedding_array = np.array(query_embedding)
# Register pgvector extension
register_vector(conn)
cur = conn.cursor()
# Get the top 3 most similar documents using the KNN <=> operator
cur.execute("SELECT content FROM embeddings ORDER BY embedding <=> %s LIMIT 3", (embedding_array,))
top3_docs = cur.fetchall()
return top3_docs
# Load text file content (duh!)
def load_text_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
# Function to chunk text into smaller pieces
def chunk_text(text, num_chunks):
"""
Split text into a specified number of chunks.
Args:
text (str): The text to be chunked
num_chunks (int): Number of chunks to create
Returns:
list: List of text chunks
"""
if num_chunks <= 0:
raise ValueError("Number of chunks must be greater than 0")
if not text:
return [""] * num_chunks
text_length = len(text)
chunk_size = text_length // num_chunks
remainder = text_length % num_chunks
chunks = []
start = 0
for i in range(num_chunks):
# Add one extra character to the first 'remainder' chunks
current_chunk_size = chunk_size + (1 if i < remainder else 0)
end = start + current_chunk_size
chunks.append(text[start:end])
start = end
return chunks
# Function to get embeddings for a given text using OpenAI API
def get_embeddings(text):
openai_client = openai.OpenAI()
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input = text.replace("\n"," ")
)
return response.data[0].embedding
main.py
from dotenv import load_dotenv, find_dotenv
import os
import openai
import pandas as pd
import numpy as np
import psycopg2
from utils import chunk_text, get_embeddings, load_text_file, process_input_with_retrieval
from psycopg2.extras import execute_values
from pgvector.psycopg2 import register_vector
_ = load_dotenv(find_dotenv())
openai.api_key = os.environ['OPENAI_API_KEY']
new_list = []
text = load_text_file('general.txt')
# You can change the number of chunks here based on your needs and the size of the text
chunks = chunk_text(text, 5)
for i in range(len(chunks)):
embedding = get_embeddings(chunks[i])
new_list.append([chunks[i], embedding])
df = pd.DataFrame(new_list, columns=['content', 'embedding'])
print(f"Created DataFrame with {len(df)} rows")
connection_string = os.environ['TIMESCALE_CONNECTION_STRING']
# Connect to PostgreSQL database in Timescale using connection string
conn = psycopg2.connect(connection_string)
cur = conn.cursor()
#install pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
conn.commit()
#install pgvectorscale
cur.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")
conn.commit()
# Create table to store embeddings and metadata
table_create_command = """
-- Remove table embeddings if it exists
DROP TABLE IF EXISTS embeddings;
CREATE TABLE embeddings (
id bigserial primary key,
content text,
embedding vector(1536)
);
"""
cur.execute(table_create_command)
cur.close()
conn.commit()
#Batch insert embeddings and metadata from dataframe into PostgreSQL database
register_vector(conn)
cur = conn.cursor()
# Prepare the list of tuples to insert
data_list = [(row['content'], np.array(row['embedding'])) for index, row in df.iterrows()]
# Use execute_values to perform batch insertion
execute_values(cur, "INSERT INTO embeddings (content, embedding) VALUES %s", data_list)
# Commit after we insert all embeddings
conn.commit()
# Create an index on the data for faster retrieval
cur.execute('CREATE INDEX embedding_idx ON embeddings USING diskann (embedding);')
conn.commit()
input = 'Is Python portable?'
response = process_input_with_retrieval(input, conn)
print(input)
print(response)
Here’s the outcome
You can play around with this by changing the input, the data, etc.
All in all, it works great. And the fact that you are using one of the most widely used relational databases as your vector store for RAG is super convenient.
Thanks: