How to set up RAG for OpenAI agents using IRIS Vector DB in Python
In this article, I’ll walk you through an example of using InterSystems IRIS Vector DB to store embeddings and integrate them with an OpenAI agent.
To demonstrate this, we’ll create an OpenAI agent with knowledge of InterSystems technology. We’ll achieve this by storing embeddings of some InterSystems documentation in IRIS and then using IRIS vector search to retrieve relevant content—enabling a Retrieval-Augmented Generation (RAG) workflow.
Note: Section 1 details how process text into embeddings. If you are only interested in IRIS vector search you can skip ahead to Section 2.
Section 1: Embedding Data
Your embeddings are only as good as your data! To get the best results, you should prepare your data carefully. This may include:
- Cleaning the text (removing special characters or excess whitespace)
- Chunking the data into smaller pieces
- Other preprocessing techniques
For this example, the documentation is stored in simple text files that require minimal cleaning. However, we will divide the text into chunks to enable more efficient and accurate RAG.
Step 1: Chunking Text Files
Chunking text into manageable pieces benefits RAG systems in two ways:
- More accurate retrieval – embeddings represent smaller, more specific sections of text.
- More efficient retrieval – less text per query reduces cost and improves performance.
For this example, we’ll store the chunked text in Parquet files before uploading to IRIS (though you can use any approach, including direct upload).
Chunking Function
We’ll use RecursiveCharacterTextSplitter from langchain_text_splitters to split text strategically based on paragraph, sentence, and word boundaries.
- Chunk size: 300 tokens (larger chunks provide more context but increase retrieval cost)
- Chunk overlap: 50 tokens (helps maintain context across chunks)
from langchain_text_splitters import RecursiveCharacterTextSplitter
def chunk_text_by_tokens(text: str, chunk_size: int, chunk_overlap: int) -> list[str]:
"""
Chunk text prioritizing paragraph and sentence boundaries using
RecursiveCharacterTextSplitter. Returns a list of chunk strings.
"""
splitter = RecursiveCharacterTextSplitter(
# Prioritize larger semantic units first, then fall back to smaller ones
separators=["\n\n", "\n", ". ", " ", ""],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False,
)
return splitter.split_text(text)
Next, we’ll use the chunking function to process one text file at a time and apply a tiktoken encoder to calculate token counts and generate metadata. This metadata will be useful later when creating embeddings and storing them in IRIS.
from pathlib import Path
import tiktoken
def chunk_file(path: Path, chunk_size: int, chunk_overlap: int, encoding_name: str = "cl100k_base") -> list[dict]:
"""
Read a file, split its contents into token-aware chunks, and return metadata for each chunk.
Returns a list of dicts with keys:
- filename
- relative_path
- absolute_path
- chunk_index
- chunk_text
- token_count
- modified_time
- size_bytes
"""
p = Path(path)
if not p.exists() or not p.is_file():
raise FileNotFoundError(f"File not found: {path}")
try:
text = p.read_text(encoding="utf-8", errors="replace")
except Exception as e:
raise RuntimeError(f"Failed to read file {p}: {e}")
# Prepare tokenizer for accurate token counts
try:
encoding = tiktoken.get_encoding(encoding_name)
except Exception as e:
raise ValueError(f"Invalid encoding name '{encoding_name}': {e}")
# Create chunks using provided chunker
chunks = chunk_text_by_tokens(text, chunk_size, chunk_overlap)
# File metadata
stat = p.stat()
from datetime import datetime, timezone
modified_time = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
absolute_path = str(p.resolve())
try:
relative_path = str(p.resolve().relative_to(Path.cwd()))
except Exception:
relative_path = p.name
# Build rows
rows: list[dict] = []
for idx, chunk in enumerate(chunks):
token_count = len(encoding.encode(chunk))
rows.append({
"filename": p.name,
"relative_path": relative_path,
"absolute_path": absolute_path,
"chunk_index": idx,
"chunk_text": chunk,
"token_count": token_count,
"modified_time": modified_time,
"size_bytes": stat.st_size,
})
return rows
Step 2: Creating embeddings
You can generate embeddings using cloud providers (e.g., OpenAI) or local models via Ollama (e.g., nomic-embed-text). In this example, we’ll use OpenAI’s text-embedding-3-small model to embed each chunk and save the results back to Parquet for later ingestion into IRIS Vector DB.
from openai import OpenAI
import pandas as pd
def embed_and_save_parquet(input_parquet_path: str, output_parquet_path: str):
"""
Loads a Parquet file, creates embeddings for the 'chunk_text' column using
OpenAI's small embedding model, and saves the result to a new Parquet file.
Args:
input_parquet_path (str): Path to the input Parquet file containing 'chunk_text'.
output_parquet_path (str): Path to save the new Parquet file with embeddings.
openai_api_key (str): Your OpenAI API key.
"""
key = os.getenv("OPENAI_API_KEY")
if not key:
print("ERROR: OPENAI_API_KEY environment variable is not set.", file=sys.stderr)
sys.exit(1)
try:
# Load the Parquet file
df = pd.read_parquet(input_parquet_path)
# Initialize OpenAI client
client = OpenAI(api_key=key)
# Generate embeddings for each chunk_text
embeddings = []
for text in df['chunk_text']:
response = client.embeddings.create(
input=text,
model="text-embedding-3-small" # Using the small embedding model
)
embeddings.append(response.data[0].embedding)
# Add embeddings to the DataFrame
df['embedding'] = embeddings
# Save the new DataFrame to a Parquet file
df.to_parquet(output_parquet_path, index=False)
print(f"Embeddings generated and saved to {output_parquet_path}")
except FileNotFoundError:
print(f"Error: Input file not found at {input_parquet_path}")
except KeyError:
print("Error: 'chunk_text' column not found in the input Parquet file.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
Step 3: Put the data processing together
Now it’s time to run the pipeline. In this example, we’ll load and chunk the Business Service documentation, generate embeddings, and write the results to Parquet for IRIS ingestion.
CHUNK_SIZE_TOKENS = 300
CHUNK_OVERLAP_TOKENS = 50
ENCODING_NAME="cl100k_base"
current_file_path = Path(__file__).resolve()
load_documentation_to_parquet(input_dir=current_file_path.parent / "Documentation" / "BusinessService",
output_file=current_file_path.parent / "BusinessService.parquet",
chunk_size=CHUNK_SIZE_TOKENS,
chunk_overlap=CHUNK_OVERLAP_TOKENS,
encoding_name=ENCODING_NAME)
embed_and_save_parquet(input_parquet_path=current_file_path.parent / "BusinessService.parquet",
output_parquet_path=current_file_path.parent / "BusinessService_embedded.parquet")
A row in our final business service parquet file will look something like this:
{"filename":"FileInboundAdapters.txt","relative_path":"Documentation\\BusinessService\\Adapters\\FileInboundAdapters.txt","absolute_path":"C:\\Users\\…\\Documentation\\BusinessService\\Adapters\\FileInboundAdapters.txt","chunk_index":0,"chunk_text":"Settings for the File Inbound Adapter\nProvides reference information for settings of the file inbound adapter, EnsLib.File.InboundAdapterOpens in a new tab. You can configure these settings after you have added a business service that uses this adapter to your production.\nSummary","token_count":52,"modified_time":"2025-11-25T18:34:16.120336+00:00","size_bytes":13316,"embedding":[-0.02851865254342556,0.01860344596207142,…,0.0135544464207155]}
Section 2: Using IRIS Vector Search
Step 4: Upload Your Embeddings to IRIS
Choose the IRIS namespace and table name you’ll use to store embeddings. (The script below will create the table if it doesn’t already exist.) Then use the InterSystems IRIS Python DB-API driver to insert the chunks and their embeddings.
The function below reads a Parquet file containing chunk text and embeddings, normalizes the embedding column to a JSON-serializable list of floats, connects to IRIS, creates the destination table if it doesn’t exist (with a VECTOR(FLOAT, 1536) column, where 1536 is the number of dimensions in the embedding), and then inserts each row using TO_VECTOR(?) in a parameterized SQL statement. It commits the transaction on success, logs progress, and cleans up the connection, rolling back on database errors.
import iris # The InterSystems IRIS Python DB-API driver
import pandas as pd
import numpy as np
import json
from pathlib import Path
# --- Configuration ---
PARQUET_FILE_PATH = "your_embeddings.parquet"
IRIS_HOST = "localhost"
IRIS_PORT = 8881
IRIS_NAMESPACE = "VECTOR"
IRIS_USERNAME = "superuser"
IRIS_PASSWORD = "sys"
TABLE_NAME = "AIDemo.Embeddings" # Must match the table created in IRIS
EMBEDDING_DIMENSIONS = 1536 # Must match the dimensions for the embeddings you used
def upload_embeddings_to_iris(parquet_path: str):
"""
Reads a Parquet file with 'chunk_text' and 'embedding' columns
and uploads them to an InterSystems IRIS vector database table.
"""
# 1. Load data from the Parquet file using pandas
try:
df = pd.read_parquet(parquet_path)
if 'chunk_text' not in df.columns or 'embedding' not in df.columns:
print("Error: Parquet file must contain 'chunk_text' and 'embedding' columns.")
return
except FileNotFoundError:
print(f"Error: The file at {parquet_path} was not found.")
return
# Ensure embeddings are in a format compatible with TO_VECTOR function (list of floats)
# Parquet often saves numpy arrays as lists
if isinstance(df['embedding'].iloc[0], np.ndarray):
df['embedding'] = df['embedding'].apply(lambda x: x.tolist())
print(f"Loaded {len(df)} records from {parquet_path}.")
# 2. Establish connection to InterSystems IRIS
connection = None
try:
conn_string = f"{IRIS_HOST}:{IRIS_PORT}/{IRIS_NAMESPACE}"
connection = iris.connect(conn_string, IRIS_USERNAME, IRIS_PASSWORD)
cursor = connection.cursor()
print("Successfully connected to InterSystems IRIS.")
# Create embedding table if it doesn't exist
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
ID INTEGER IDENTITY PRIMARY KEY,
chunk_text VARCHAR(2500), embedding VECTOR(FLOAT, {EMBEDDING_DIMENSIONS})
)"""
)
# 3. Prepare the SQL INSERT statement
# InterSystems IRIS uses the TO_VECTOR function for inserting vector data via SQL
insert_sql = f"""
INSERT INTO {TABLE_NAME} (chunk_text, embedding)
VALUES (?, TO_VECTOR(?))
"""
# 4. Iterate and insert data
count = 0
for index, row in df.iterrows():
text = row['chunk_text']
# Convert the list of floats to a JSON string, which is required by TO_VECTOR when using DB-API
vector_json_str = json.dumps(row['embedding'])
cursor.execute(insert_sql, (text, vector_json_str))
count += 1
if count % 100 == 0:
print(f"Inserted {count} rows...")
# Commit the transaction
connection.commit()
print(f"Data upload complete. Total rows inserted: {count}.")
except iris.DBAPIError as e:
print(f"A database error occurred: {e}")
if connection:
connection.rollback()
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
if connection:
connection.close()
print("Database connection closed.")
Example usage:
current_file_path = Path(__file__).resolve()
upload_embeddings_to_iris(current_file_path.parent / "BusinessService_embedded.parquet")
Step 5: Create your embedding search functionality
Next, we’ll create a search function that embeds the user’s query, runs a vector similarity search in IRIS via the Python DB‑API, and returns the top‑k matching chunks from our embeddings table.
The example function below reads a Parquet file containing text chunks and their corresponding embeddings, then uploads this data into the InterSystems IRIS vector storage table. It first validates the Parquet file and normalizes the embedding format into a JSON array string compatible with IRIS’s TO_VECTOR function. After establishing a connection to IRIS, the function creates the target table if it does not exist, prepares a parameterized SQL INSERT statement, and iterates through each row to insert the chunk text and embedding. Finally, it commits the transaction, logs progress, and ensures proper error handling and cleanup of the database connection.
import iris
from typing import List
import os
from openai import OpenAI
# --- Configuration ---
PARQUET_FILE_PATH = "your_embeddings.parquet"
IRIS_HOST = "localhost"
IRIS_PORT = 8881
IRIS_NAMESPACE = "VECTOR"
IRIS_USERNAME = "superuser"
IRIS_PASSWORD = "sys"
TABLE_NAME = "AIDemo.Embeddings" # Must match the table created in IRIS
EMBEDDING_DIMENSIONS = 1536
MODEL = "text-embedding-3-small"
def get_embedding(text: str, model: str, client) -> List[float]:
# Normalize newlines and coerce to str
payload = [("" if text is None else str(text)).replace("\n", " ") for _ in range(1)]
resp = client.embeddings.create(model=model, input=payload, encoding_format="float")
return resp.data[0].embedding
def search_embeddings(search: str, top_k: int):
print("-------RAG--------")
print(f"Searching IRIS vector store for: ", search)
key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=key)
# 2. Establish connection to InterSystems IRIS
connection = None
try:
conn_string = f"{IRIS_HOST}:{IRIS_PORT}/{IRIS_NAMESPACE}"
connection = iris.connect(conn_string, IRIS_USERNAME, IRIS_PASSWORD)
cursor = connection.cursor()
print("Successfully connected to InterSystems IRIS.")
# Embed query for searching
#emb_raw = str(test_embedding) # FOR TESTING
emb_raw = get_embedding(search, model=MODEL, client=client)
emb_raw = str(emb_raw)
#print("EMB_RAW:", emb_raw)
emb_values = []
for x in emb_raw.replace('[', '').replace(']', '').split(','):
try:
emb_values.append(str(float(x.strip())))
except ValueError:
continue
emb_str = ", ".join(emb_values)
# Prepare the SQL SELECT statement
search_sql = f"""
SELECT TOP {top_k} ID, chunk_text FROM {TABLE_NAME}
ORDER BY VECTOR_DOT_PRODUCT((embedding), TO_VECTOR(('{emb_str}'), FLOAT)) DESC
"""
cursor.execute(search_sql)
results = []
row = cursor.fetchone()
while row is not None:
results.append(row[:])
row = cursor.fetchone()
except iris.DBAPIError as e:
print(f"A database error occurred: {e}")
if connection:
connection.rollback()
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
if connection:
connection.close()
print("Database connection closed.")
print("------------RAG Finished-------------")
return results
Step 6: Add RAG context to your agent
Now that you’ve:
- Chunked and embedded your documentation,
- Uploaded embeddings to IRIS and created a vector index,
- Built a search function for IRIS vector queries,
it’s time to put it all together into an interactive Retrieval-Augmented Generation (RAG) chat using the OpenAI Responses API. For this example we will give the agent access to the search function directly (for more fine-grained control of the agent), but this can also be done using a library like langchain as well.
First, you will need to create your instructions for the agent, making sure give it access to the search function:
import os
# ---------------------------- Configuration ----------------------------
MODEL = os.getenv("OPENAI_RESPONSES_MODEL", "gpt-5-nano")
SYSTEM_INSTRUCTIONS = (
"You are a helpful assistant that answers questions about InterSystems "
"business services and related integration capabilities. You have access "
"to a vector database of documentation chunks about business services. "
"\n\n"
"Use the `search_business_docs` tool whenever the user asks about specific "
"settings, configuration options, or how to perform tasks with business "
"services. Ground your answers in the retrieved context, quoting or "
"summarizing relevant chunks. If nothing relevant is found, say so "
"clearly and answer from your general knowledge with a disclaimer."
)
# ---------------------------- Tool Definition ----------------------------
TOOLS = [
{
"type": "function",
"name": "search_business_docs",
"description": (
"Searches a vector database of documentation chunks related to "
"business services and returns the most relevant snippets."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Natural language search query describing what you want "
"to know about business services."
),
},
"top_k": {
"type": "integer",
"description": (
"Maximum number of results to retrieve from the vector DB."
),
"minimum": 1,
"maximum": 10,
},
},
"required": ["query", "top_k"],
"additionalProperties": False,
},
"strict": True,
}
]
Now we need a small “router” method to let the model actually use our RAG tool.
call_rag_tool(name, args) receives a function call emitted by the OpenAI Responses API and routes it to our local implementation (the search_business_docs tool that wraps Search.search_embeddings). It takes the model’s query and top_k, runs the IRIS vector search, and returns a JSON‑encoded payload of the top matches (IDs and text snippets). This stringified JSON is important because the Responses API expects tool outputs as strings; by formatting the results predictably, we make it easy for the model to ground its final answer in the retrieved documentation. If an unknown tool name is requested, the function returns an error payload so the model can handle it gracefully.
def call_rag_tool(name: str, args: Dict[str, Any]) -> str:
"""Route function calls from the model to our local Python implementations.
Currently only supports the `search_business_docs` tool, which wraps
`Search.search_embeddings`.
The return value must be a string. We will JSON-encode a small structure
so the model can consume the results reliably.
"""
if name == "search_business_docs":
query = args.get("query", "")
top_k = args.get("top_k", "")
results = search_embeddings(query, top_k)
# Expecting each row to be something like (ID, chunk_text)
formatted: List[Dict[str, Any]] = []
for row in results:
if not row:
continue
# Be defensive in case row length/structure changes
doc_id = row[0] if len(row) > 0 else None
text = row[1] if len(row) > 1 else None
formatted.append({"id": doc_id, "text": text})
payload = {"query": query, "results": formatted}
return json.dumps(payload, ensure_ascii=False)
# Unknown tool; return an error-style payload
return json.dumps({"error": f"Unknown tool name: {name}"})
Now that we have our RAG tool, we can start work on the chat loop logic. First, we need a helper to reliably pull the model’s final answer and any tool outputs from the OpenAI Responses API. extract_answer_and_sources(response) walks the response.output items containing the models outputs and concatenates them into a single answer string. It also collects the function_call_output payloads (the JSON we returned from our RAG tool), parses them, and exposes them as tool_context for transparency and debugging. The function parses the model output into a compact structure: {"answer": ..., "tool_context": [...]}.
def extract_answer_and_sources(response: Any) -> Dict[str, Any]:
"""Extract a structured answer and optional sources from a Responses API object.
We don't enforce a global JSON response schema here. Instead, we:
- Prefer the SDK's `output_text` convenience when present
- Fall back to concatenating any `output_text` content parts
- Also surface any tool-call-output payloads we got back this turn as
`tool_context` for debugging/inspection.
"""
answer_text = ""
# Preferred: SDK convenience
if hasattr(response, "output_text") and response.output_text:
answer_text = response.output_text
else:
# Fallback: walk output items
parts: List[str] = []
for item in getattr(response, "output", []) or []:
if getattr(item, "type", None) == "message":
for c in getattr(item, "content", []) or []:
if getattr(c, "type", None) == "output_text":
parts.append(getattr(c, "text", ""))
answer_text = "".join(parts)
# Collect any function_call_output items for visibility
tool_context: List[Dict[str, Any]] = []
for item in getattr(response, "output", []) or []:
if getattr(item, "type", None) == "function_call_output":
try:
tool_context.append({
"call_id": getattr(item, "call_id", None),
"output": json.loads(getattr(item, "output", "")),
})
except Exception:
tool_context.append({
"call_id": getattr(item, "call_id", None),
"output": getattr(item, "output", ""),
})
return {"answer": answer_text.strip(), "tool_context": tool_context}
With the help of extract_answer_and_sources we can build the whole chat loop to orchestrate a two‑phase, tool‑calling conversation with the OpenAI Responses API. The chat_loop() function runs an interactive CLI: it collects the user’s question, sends a first request with system instructions and the search_business_docs tool, and then inspects any function_call items the model emits. For each function call, it executes our local RAG tool (call_rag_tool, which wraps search_embeddings) and appends the result back to the conversation as a function_call_output. It then makes a second request asking the model to use those tool outputs to produce a grounded answer, parses that answer via extract_answer_and_sources, and prints it. The loop maintains running context in input_items so each turn can build on prior messages and tool results.
def chat_loop() -> None:
"""Run an interactive CLI chat loop using the OpenAI Responses API.
The loop supports multi-step tool-calling:
- First call may return one or more `function_call` items
- We execute those locally (e.g., call search_embeddings)
- We send the tool outputs back in a second `responses.create` call
- Then we print the model's final, grounded answer
"""
key = os.getenv("OPENAI_API_KEY")
if not key:
raise RuntimeError("OPENAI_API_KEY is not set in the environment.")
client = OpenAI(api_key=key)
print("\nBusiness Service RAG Chat")
print("Type 'exit' or 'quit' to stop.\n")
# Running list of inputs (messages + tool calls + tool outputs) for context
input_items: List[Dict[str, Any]] = []
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in {"exit", "quit"}:
print("Goodbye.")
break
# Add user message
input_items.append({"role": "user", "content": user_input})
# 1) First call: let the model decide whether to call tools
response = client.responses.create(
model=MODEL,
instructions=SYSTEM_INSTRUCTIONS,
tools=TOOLS,
input=input_items,
)
# Save model output items to our running conversation
input_items += response.output
# 2) Execute any function calls
# The Responses API returns `function_call` items in `response.output`.
for item in response.output:
if getattr(item, "type", None) != "function_call":
continue
name = getattr(item, "name", None)
raw_args = getattr(item, "arguments", "{}")
try:
args = json.loads(raw_args) if isinstance(raw_args, str) else raw_args
except json.JSONDecodeError:
args = {"query": user_input}
result_str = call_rag_tool(name, args or {})
# Append tool result back as function_call_output
input_items.append(
{
"type": "function_call_output",
"call_id": getattr(item, "call_id", None),
"output": result_str,
}
)
# 3) Second call: ask the model to answer using tool outputs
followup = client.responses.create(
model=MODEL,
instructions=(
SYSTEM_INSTRUCTIONS
+ "\n\nYou have just received outputs from your tools. "
+ "Use them to give a concise, well-structured answer."
),
tools=TOOLS,
input=input_items,
)
structured = extract_answer_and_sources(followup)
print("Agent:\n" + structured["answer"] + "\n")
That’s it! You’ve built a complete RAG pipeline powered by IRIS Vector Search. While this example focused on a simple use case, IRIS Vector Search opens the door to many more possibilities:
- Knowledge store for more complex customer support agents
- Conversational context storage for hyper-personalized agents
- Anomaly detection in textual data
- Clustering analysis for textual data
I hope this walkthrough gave you a solid starting point for exploring vector search and building your own AI-driven applications with InterSystems IRIS!
The full codebase can be found here: