
Large Language Models (LLMs) like OpenAI’s ChatGPT are very large in size and complexity. They are at the center of numerous applications, ranging from chatbots to robust generative tasks.
While they have an impressive ability to understand and output human-like text, they also present challenges when deployed in production — particularly in terms of latency and computational cost. A semantic cache layer addresses many of these challenges in LLM production workloads.
What is a semantic cache layer?
A semantic cache not only stores previous results like a traditional cache, but it also understands the semantic meaning of the query. This means that queries or questions that are not exact matches can still provide previous answers if the intent of the request is the same.
This is crucial for LLM production workloads for a number of reasons:
- Repetitive queries. Users asking very similar questions do not need to repeatedly invoke the LLM.
- Reduced latency. A single call to the model can be resource intensive and time consuming. A semantic cache can answer questions almost instantaneously for a better user experience.
- Scalability. Handling increased simultaneous requests can strain the system, and a semantic cache significantly offloads demand for computational resources
- Cost. Lower operational costs by reducing the number of calls to the model.
Why use SingleStoreDB as the semantic cache layer?
SingleStoreDB is a real-time, distributed database designed for blazing fast queries with an architecture that supports a hybrid model for transactional and analytical workloads. This pairs nicely with generative AI use cases as it allows for reading or writing data for both training and real-time tasks — without adding complexity and data movement from multiple products for the same task. SingleStoreDB also has a built-in plancache to speed up subsequent queries with the same plan.
Let's build this!
Tables
To illustrate this, we have a stock ticker data table:
1
CREATE TABLE stock_table (2
ticker varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,3
created_at datetime DEFAULT NULL,4
`open` float DEFAULT NULL,5
`high` float DEFAULT NULL,6
`low` float DEFAULT NULL,7
`close` float DEFAULT NULL,8
volume int(11) DEFAULT NULL,9
SORT KEY (ticker, created_at desc),10
SHARD KEY (ticker)11
);
The table is populated with about 6 million records. Here are five random rows:

There is also a table to store the vector embeddings:
1
CREATE TABLE embeddings (2
id bigint(11) NOT NULL AUTO_INCREMENT,3
category varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,4
question longtext CHARACTER SET utf8 COLLATE utf8_general_ci,5
question_embedding longblob,6
answer longtext CHARACTER SET utf8 COLLATE utf8_general_ci,7
answer_embedding longblob,8
created_at datetime DEFAULT NULL,9
UNIQUE KEY `PRIMARY (id) USING HASH,10
SHARD KEY __SHARDKEY (id),11
SORT KEY __UNORDERED ()12
);
1
import singlestoredb as s22
import getpass3
import numpy as np4
import openai5
from sqlalchemy import create_engine6
from langchain.agents import create_sql_agent7
from langchain.agents.agent_toolkits import SQLDatabaseToolkit8
from langchain.sql_database import SQLDatabase9
from langchain.llms.openai import OpenAI10
from langchain.agents import AgentExecutor11
import time12
from openai.embeddings_utils import get_embeddings13
apikey = getpass.getpass("Enter openai apikey here")14
os.environ["OPENAI_API_KEY"] = apikey15
model = 'text-embedding-ada-002'16
table_name = 'embeddings'17
s2_conn = s2.connect(connection_url)
Agent executor
1
# Create the agent executor2
db = SQLDatabase.from_uri(3
connection_url,4
include_tables=['embeddings', 'stock_table'],5
sample_rows_in_table_info=16
)7
llm = OpenAI(8
openai_api_key=os.environ["OPENAI_API_KEY"],9
temperature=0,10
verbose=True11
)12
toolkit = SQLDatabaseToolkit(db=db, llm=llm)13
14
agent_executor = create_sql_agent(15
llm=OpenAI(temperature=0),16
toolkit=toolkit,17
verbose=True,18
top_k=3,19
max_iterations=520
)
Function to process user questions
Now that the LLM is set up, we can write a function that embeds a question and checks that against the most semantically similar question in our database. If it is above a high threshold, we can assume the user is asking the same kind of question that had been asked before, and output the previous answer. If that question was not asked before it will make the call to our model.
1
def process_user_question(question):2
print(f'\nQuestion asked: {question}')3
category = 'chatbot'4
5
# Record the start time6
start_time = time.time()7
8
question_embedding= [np.array(x, '<f4') for x in9
get_embeddings([question], api_key=apikey, engine=model)]10
11
# Calculate the elapsed time12
elapsed_time = (time.time() - start_time) * 100013
print(f"Execution time for getting the question embedding:14
{elapsed_time:.2f} milliseconds")15
16
params = {17
'question_embedding': question_embedding,18
}19
# Check if embedding is similar to existing questions20
stmt = f'select question, answer, dot_product( %(question_embedding)s,21
question_embedding) :> float as score from embeddings where22
category="chatbot" order by score desc limit 1;'23
24
25
with s2_conn.cursor() as cur:26
# Record the start time27
start_time = time.time()28
29
cur.execute(stmt, params)30
row = cur.fetchone()31
32
elapsed_time = (time.time() - start_time) * 100033
print(f"Execution time for checking existing questions:34
{elapsed_time:.2f} milliseconds")35
36
try:37
38
question2, answer, score = row39
print(f"\nClosest Matching row:\nQuestion: {question2}\nAnswer:40
{answer}\nSimilarity Score: {score}")41
42
if score >.97:43
print('Action to take: Using existing answer')44
return answer45
46
else:47
print('Action to take: Running agent_executor')48
49
50
# Record the start time51
start_time = time.time()52
53
answer2 = agent_executor.run(question)54
55
# Calculate the elapsed time56
elapsed_time = (time.time() - start_time) * 100057
print(f"agent_executor execution time: {elapsed_time:.2f}58
milliseconds")59
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")60
61
# Record the start time62
start_time = time.time()63
64
answer_embedding = [np.array(x, '<f4') for x in65
get_embeddings([answer2], api_key=apikey, engine=model)]66
67
# Calculate the elapsed time68
elapsed_time = (time.time() - start_time) * 100069
print(f"Answer embeddings execution time:70
{elapsed_time:.2f} milliseconds")71
72
params = {'category': category, 'question': question,73
'question_embedding': question_embedding,74
'answer': answer2, 'answer_embedding':75
answer_embedding,76
'created_at': created_at}77
78
# Send to SingleStoreDB79
stmt = f"INSERT INTO {table_name} (category, question,80
question_embedding, answer, answer_embedding, created_at) VALUES81
(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,82
\n%(answer_embedding)s, \n%(created_at)s)"83
84
# Record the start time85
start_time = time.time()86
87
with s2_conn.cursor() as cur:88
cur.execute(stmt, params)89
90
# Calculate the elapsed time91
elapsed_time = (time.time() - start_time) * 100092
print(f"Insert to SingleStore execution time:93
{elapsed_time:.2f} milliseconds")94
95
return answer296
97
98
except:99
print('No existing rows. Running agent_executor')100
101
102
# Record the start time103
start_time = time.time()104
105
answer2 = agent_executor.run(question)106
107
# Calculate the elapsed time108
elapsed_time = (time.time() - start_time) * 1000109
print(f"agent_executor execution time: {elapsed_time:.2f}110
milliseconds")111
112
created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")113
114
# Record the start time115
start_time = time.time()116
117
answer_embedding = [np.array(x, '<f4') for x in118
get_embeddings([answer2], api_key=apikey, engine=model)]119
120
# Calculate the elapsed time121
elapsed_time = (time.time() - start_time) * 1000122
print(f"Answer embeddings execution time: {elapsed_time:.2f}123
milliseconds")124
125
params = {'category': category, 'question': question,126
'question_embedding': question_embedding,127
'answer': answer2, 'answer_embedding':128
answer_embedding,129
'created_at': created_at}130
131
# Send to SingleStoreDB132
stmt = f"INSERT INTO {table_name} (category, question,133
question_embedding, answer, answer_embedding, created_at) VALUES134
(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,135
\n%(answer_embedding)s, \n%(created_at)s)"136
137
# Record the start time138
start_time = time.time()139
140
with s2_conn.cursor() as cur:141
cur.execute(stmt, params)142
143
# Calculate the elapsed time144
elapsed_time = (time.time() - start_time) * 1000145
print(f"Insert to SingleStore execution time:146
{elapsed_time:.2f} milliseconds")147
148
return answer2
Putting it to the test
Here are two questions that have nearly the same meaning:
1
question_1 = "describe the database"2
question_2 = "describe database"
Processing the first question will make a call to the model, since it hasn’t been asked before. This took 4.44 seconds to complete.

The next question is asking virtually the same thing, but the semantic cache is leveraged. This takes 286 milliseconds to complete, an improvement of over 15.5x!

Conclusion
In the rapidly evolving landscape of AI and LLMs, it’s extremely important to ensure that systems are fast, efficient and scalable. A semantic cache layer solves many challenges in production workloads. Combining this with SingleStoreDB — which is built on the same principles ± promotes a better developer and user experience, while improving operational efficiency and reducing costs associated with computational resources.