Introduction
You’ll be able to simply create a easy utility that may chat with SQL Database. However right here’s the issue with that. You’ll be able to’t make it work seamlessly in terms of dealing with and dealing with giant databases. If the database is big, it’s impractical to incorporate the whole listing of columns and tables within the immediate context. This text explains how one can bypass this impediment by creating an AI utility that may chat with large SQL databases.
Creating an AI Utility to Chat with Huge SQL Databases
The next code initiates a easy Streamlit utility that allows customers to connect with an SQL database and chat with it.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
# Create essential folders
folders_to_create = ['csvs']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load the OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def get_basic_table_details(cursor):
question = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_name IN (
SELECT tablename FROM pg_tables WHERE schemaname="public"
);"""
cursor.execute(question)
return cursor.fetchall()
def save_db_details(db_uri):
unique_id = str(uuid4()).exchange("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t = f'csvs/tables_{unique_id}.csv'
df.to_csv(filename_t, index=False)
cursor.shut()
connection.shut()
return unique_id
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(content=f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri} Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"),
HumanMessagePromptTemplate.from_template("{text}")
])
reply = chat_llm(template.format_messages(textual content=question))
return reply.content material
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nn'
return generate_template_for_sql(question, table_info, db_uri)
def execute_the_solution(answer, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = answer.cut up("```")
cursor.execute(final_query.strip())
consequence = cursor.fetchall()
return str(consequence)
def connect_with_db(uri):
st.session_state.db_uri = uri
st.session_state.unique_id = save_db_details(uri)
return {"message": "Database connection established!"}
def send_message(message):
answer = get_the_output_from_llm(message, st.session_state.unique_id, st.session_state.db_uri)
consequence = execute_the_solution(answer, st.session_state.db_uri)
return {"message": answer + "nn" + "End result:n" + consequence}
# Streamlit interface setup
st.subheader("Directions")
st.markdown("1. Enter your RDS Database URI under.n2. ")
The elemental technique for simplifying the immediate includes sending solely the related tables and column names that pertain to the person’s question. To attain this, we are able to generate embeddings for the desk and column names, dynamically retrieve probably the most pertinent ones primarily based on the person’s enter, and embrace these within the immediate. On this article, we’ll make the most of ChromaDB as our vector database. Nevertheless, options like Pinecone, Milvus, or any appropriate vector database can be utilized.
The right way to Simplify the Immediate
Let’s start by putting in ChromaDB.
pip set up chromadb
First, we’ll arrange a further folder named ‘vectors’ alongside the ‘csvs’ folder to retailer embeddings of desk and column names. This folder may also comprise different pertinent database particulars, such because the international keys that hyperlink totally different tables, and potential values for the WHERE clause.
def generate_embeddings(filename, storage_folder):
csv_loader = CSVLoader(file_path=filename, encoding="utf8")
dataset = csv_loader.load()
vector_database = Chroma.from_documents(dataset, embedding=embeddings, persist_directory=storage_folder)
vector_database.persist()
We may also first test whether or not the person’s question wants any details about tables or if the person is as a substitute asking about simply the overall schema of the database.
def check_user_intent_for_database_info_or_sql(question):
# Outline a template for the dialog
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"Based on the provided text, the user is asking a question about databases. "
"Determine if the user seeks information about the database schema or if they want to write a SQL query. "
"Respond with 'yes' if the user is seeking information about the database schema and 'no' if they intend to write a SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate a response utilizing a language mannequin
response = chat_llm(prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
The person responds with both ‘sure’ or ‘no’. If the reply is ‘sure’, a immediate is generated.
def generate_sql_query_prompt(question, db_uri):
# Configure the chat template with system and human messages
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"As an assistant tasked with writing SQL queries, create a SQL query based on the text below. "
"Enclose the SQL query within three backticks '```' for clarity. "
"Aim to use 'SELECT' queries as much as possible. "
f"The connection string for the database is {db_uri}."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate and print the reply utilizing a language mannequin
response = chat_llm.prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
If the reply is ‘no’, it signifies that the person’s question particularly requires the names of tables and columns inside these tables. We are going to then establish probably the most related tables & columns, and assemble a string from these to incorporate in our immediate.
Subsequent, we’ll confirm that our vectors have been efficiently created and that every one different elements are functioning appropriately. Beneath is the whole code up up to now.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Create essential folders for information storage
folders_to_create = ['csvs', 'vectors']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load API key from setting variable
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
# Operate to retrieve primary desk particulars from the database
def get_basic_table_details(cursor):
cursor.execute("""
SELECT c.table_name, c.column_name, c.data_type
FROM information_schema.columns c
WHERE c.table_name IN (
SELECT tablename
FROM pg_tables
WHERE schemaname="public"
);""")
return cursor.fetchall()
# Operate to create vector databases from CSV recordsdata
def create_vectors(filename, persist_directory):
loader = CSVLoader(file_path=filename, encoding="utf8")
information = loader.load()
vectordb = Chroma.from_documents(information, embedding=embeddings, persist_directory=persist_directory)
vectordb.persist()
# Operate to avoid wasting database particulars and generate vectors
def save_db_details(db_uri):
unique_id = str(uuid4()).exchange("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t="csvs/tables_" + unique_id + '.csv'
df.to_csv(filename_t, index=False)
create_vectors(filename_t, "./vectors/tables_"+ unique_id)
cursor.shut()
connection.shut()
return unique_id
# Operate to generate SQL question templates
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri}. Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
return chat_llm(template.format_messages(textual content=question)).content material
# Operate to find out if person's question is about normal schema data or SQL
def check_if_users_query_want_general_schema_information_or_sql(question):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"In the given text, the user is asking a question about the database. Determine whether the user wants information about the database schema or wants to write a SQL query. Answer 'yes' for schema information and 'no' for SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Operate to immediate when person desires normal database data
def prompt_when_user_want_general_db_information(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"You are an assistant who writes SQL queries. Given the text below, write a SQL query that answers the user's question. Prepend and append the SQL query with three backticks '```' Write select query whenever possible Connection string to this database is {db_uri}"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Operate to course of person queries and generate outputs primarily based on whether or not it is about normal schema or particular SQL question
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t="csvs/tables_" + unique_id + '.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += 'Details about desk ' + desk + ':n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
answer_to_question_general_schema = check_if_users_query_want_general_schema_information_or_sql(question)
if answer_to_question_general_schema == "sure":
return prompt_when_user_want_general_db_information(question, db_uri)
return generate_template_for_sql(question, table_info, db_uri)
# Operate to execute SQL options
def execute_the_solution(answer, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = answer.cut up("```")
final_query = final_query.strip('sql')
cursor.execute(final_query)
consequence = cursor.fetchall()
return str(consequence)
# Streamlit app setup and interplay dealing with
if __name__ == "__main__":
st.subheader("Directions")
st.markdown("""
1. Enter the URI of your RDS Database within the textual content field under.
2. Click on the **Begin Chat** button to start out the chat.
3. Enter your message within the textual content field under and press **Enter** to ship the message to the API.
""")
chat_history = []
uri = st.text_input("Enter the RDS Database URI")
if st.button("Begin Chat"):
if not uri:
st.warning("Please enter a legitimate database URI.")
else:
st.information("Connecting to the API and beginning the chat...")
chat_response = connect_with_db(uri)
if "error" in chat_response:
st.error("Error: Failed to start out the chat. Please test the URI and take a look at once more.")
else:
st.success("Chat began efficiently!")
st.subheader("Chat with the API")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if immediate := st.chat_input("What's up?"):
st.chat_message("person").markdown(immediate)
st.session_state.messages.append({"position": "person", "content material": immediate})
response = send_message(immediate)["message"]
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"position": "assistant", "content material": response})
st.write("This can be a easy Streamlit app for beginning a chat with an RDS Database.")
Within the subsequent step, we are going to carry out vector retrieval to establish probably the most related tables. As soon as these tables are chosen, we are going to collect all of the column particulars to offer further context for our immediate. We are going to then compile this data right into a string to incorporate within the immediate.
# Initialize the vector database for storing desk embeddings
vectordb = Chroma(embedding_function=embeddings, persist_directory=f"./vectors/tables_{unique_id}")
retriever = vectordb.as_retriever()
docs = retriever.get_relevant_documents(question)
print(docs)
# Amassing the related tables and their columns
relevant_tables = []
relevant_table_details = []
for doc in docs:
table_info = doc.page_content.cut up("n")
table_name = table_info[0].cut up(":")[1].strip()
column_name = table_info[1].cut up(":")[1].strip()
data_type = table_info[2].cut up(":")[1].strip()
relevant_tables.append(table_name)
relevant_table_details.append((table_name, column_name, data_type))
# Load information about all tables from a CSV file
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
# Assemble a descriptive string for every related desk, together with all columns and their information varieties
table_info = ''
for desk in relevant_tables:
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
def create_sql_query_template(question, relevant_tables, table_info):
tables_list = ",".be a part of(relevant_tables)
chat_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"As an assistant capable of composing SQL queries, please write a query that resolves the user's inquiry based on the text provided. "
f"Consider SQL tables named '{tables_list}'. "
f"Below is a detailed description of these table(s): "
f"{table_info}"
"Enclose the SQL query within three backticks '```' for proper formatting."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_llm(chat_template.format_messages(textual content=question))
print(response.content material)
return response.content material
One closing factor we are able to do is to offer details about international keys to the immediate.
import streamlit as st
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Guarantee essential directories exist
folders_to_create = ['csvs', 'vectors']
for folder in folders_to_create:
os.makedirs(folder, exist_ok=True)
print(f"Listing '{folder}' checked or created.")
# Load setting and API keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
language_model = OpenAI(openai_api_key=openai_api_key)
chat_language_model = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def fetch_table_details(cursor):
sql = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema="public";
"""
cursor.execute(sql)
return cursor.fetchall()
def fetch_foreign_key_details(cursor):
sql = """
SELECT conrelid::regclass AS table_name, conname AS foreign_key,
pg_get_constraintdef(oid) AS constraint_definition
FROM pg_constraint
WHERE contype="f" AND connamespace="public"::regnamespace;
"""
cursor.execute(sql)
return cursor.fetchall()
def create_vector_database(information, listing):
loader = CSVLoader(information=information, encoding="utf8")
document_data = loader.load()
vector_db = Chroma(embeddings, persist_directory=listing)
vector_db.from_documents(document_data)
vector_db.persist()
def save_database_details(uri):
unique_id = str(uuid4()).exchange("-", "_")
conn = psycopg2.join(uri)
cur = conn.cursor()
particulars = fetch_table_details(cur)
df = pd.DataFrame(particulars, columns=['table_name', 'column_name', 'data_type'])
csv_path = f'csvs/tables_{unique_id}.csv'
df.to_csv(csv_path, index=False)
create_vector_database(df, f"./vectors/tables_{unique_id}")
foreign_keys = fetch_foreign_key_details(cur)
fk_df = pd.DataFrame(foreign_keys, columns=['table_name', 'foreign_key', 'constraint_definition'])
fk_csv_path = f'csvs/foreign_keys_{unique_id}.csv'
fk_df.to_csv(fk_csv_path, index=False)
cur.shut()
conn.shut()
return unique_id
def generate_sql_query_template(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant capable of composing SQL queries. Use the details provided to write a relevant SQL query for the question below. DB connection string is {db_uri}."
"Enclose the SQL query with three backticks '```'."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_language_model(template.format_messages(textual content=question))
return response.content material
# Streamlit utility setup
st.title("Database Interplay Instrument")
uri = st.text_input("Enter the RDS Database URI")
if st.button("Hook up with Database"):
if uri:
attempt:
unique_id = save_database_details(uri)
st.success(f"Linked to database and information saved with ID: {unique_id}")
besides Exception as e:
st.error(f"Failed to attach: {str(e)}")
else:
st.warning("Please enter a legitimate database URI.")
In comparable methods, we are able to maintain enhancing this utility by including fallbacks. In every fallback, we are able to maintain including further data.
Conclusion
This text presents a novel method to creating an AI utility able to seamlessly interacting with large SQL databases by means of chat. We’ve addressed the problem of dealing with giant databases the place it’s impractical to incorporate the whole listing of columns and tables within the immediate. Our proposed answer dynamically retrieves related desk and column names primarily based on person queries. We make sure the immediate contains solely pertinent data, enhancing person expertise and effectivity. That is accomplished by leveraging vector databases like ChromaDB for embedding era and retrieval.
We’ve demonstrated how one can streamline the interplay course of by means of step-by-step implementation and code examples. In the meantime, we additionally labored on repeatedly enhancing the appliance’s performance. With additional enhancements resembling incorporating international keys and extra fallbacks, this utility holds promise for numerous database interplay situations.