Large Language Models (LLMs) like OpenAI’s ChatGPT are very large in size and complexity. They are at the center of numerous applications, ranging from chatbots to robust generative tasks.
While they have an impressive ability to understand and output human-like text, they also present challenges when deployed in production — particularly in terms of latency and computational cost. A semantic cache layer addresses many of these challenges in LLM production workloads.
What is a semantic cache layer?
A semantic cache not only stores previous results like a traditional cache, but it also understands the semantic meaning of the query. This means that queries or questions that are not exact matches can still provide previous answers if the intent of the request is the same.
This is crucial for LLM production workloads for a number of reasons:
- Repetitive queries. Users asking very similar questions do not need to repeatedly invoke the LLM.
- Reduced latency. A single call to the model can be resource intensive and time consuming. A semantic cache can answer questions almost instantaneously for a better user experience.
- Scalability. Handling increased simultaneous requests can strain the system, and a semantic cache significantly offloads demand for computational resources
- Cost. Lower operational costs by reducing the number of calls to the model.
Why use SingleStoreDB as the semantic cache layer?
SingleStoreDB is a real-time, distributed database designed for blazing fast queries with an architecture that supports a hybrid model for transactional and analytical workloads. This pairs nicely with generative AI use cases as it allows for reading or writing data for both training and real-time tasks — without adding complexity and data movement from multiple products for the same task. SingleStoreDB also has a built-in plancache to speed up subsequent queries with the same plan.
Let's build this!
Tables
To illustrate this, we have a stock ticker data table:
CREATE TABLE stock_table (ticker varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,created_at datetime DEFAULT NULL,`open` float DEFAULT NULL,`high` float DEFAULT NULL,`low` float DEFAULT NULL,`close` float DEFAULT NULL,volume int(11) DEFAULT NULL,SORT KEY (ticker, created_at desc),SHARD KEY (ticker));
The table is populated with about 6 million records. Here are five random rows:
There is also a table to store the vector embeddings:
CREATE TABLE embeddings (id bigint(11) NOT NULL AUTO_INCREMENT,category varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,question longtext CHARACTER SET utf8 COLLATE utf8_general_ci,question_embedding longblob,answer longtext CHARACTER SET utf8 COLLATE utf8_general_ci,answer_embedding longblob,created_at datetime DEFAULT NULL,UNIQUE KEY `PRIMARY (id) USING HASH,SHARD KEY __SHARDKEY (id),SORT KEY __UNORDERED ());
import singlestoredb as s2import getpassimport numpy as npimport openaifrom sqlalchemy import create_enginefrom langchain.agents import create_sql_agentfrom langchain.agents.agent_toolkits import SQLDatabaseToolkitfrom langchain.sql_database import SQLDatabasefrom langchain.llms.openai import OpenAIfrom langchain.agents import AgentExecutorimport timefrom openai.embeddings_utils import get_embeddingsapikey = getpass.getpass("Enter openai apikey here")os.environ["OPENAI_API_KEY"] = apikeymodel = 'text-embedding-ada-002'table_name = 'embeddings's2_conn = s2.connect(connection_url)
Agent executor
# Create the agent executordb = SQLDatabase.from_uri(connection_url,include_tables=['embeddings', 'stock_table'],sample_rows_in_table_info=1)llm = OpenAI(openai_api_key=os.environ["OPENAI_API_KEY"],temperature=0,verbose=True)toolkit = SQLDatabaseToolkit(db=db, llm=llm)agent_executor = create_sql_agent(llm=OpenAI(temperature=0),toolkit=toolkit,verbose=True,top_k=3,max_iterations=5)
Function to process user questions
Now that the LLM is set up, we can write a function that embeds a question and checks that against the most semantically similar question in our database. If it is above a high threshold, we can assume the user is asking the same kind of question that had been asked before, and output the previous answer. If that question was not asked before it will make the call to our model.
def process_user_question(question):print(f'\nQuestion asked: {question}')category = 'chatbot'# Record the start timestart_time = time.time()question_embedding= [np.array(x, '<f4') for x inget_embeddings([question], api_key=apikey, engine=model)]# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"Execution time for getting the question embedding:{elapsed_time:.2f} milliseconds")params = {'question_embedding': question_embedding,}# Check if embedding is similar to existing questionsstmt = f'select question, answer, dot_product( %(question_embedding)s,question_embedding) :> float as score from embeddings wherecategory="chatbot" order by score desc limit 1;'with s2_conn.cursor() as cur:# Record the start timestart_time = time.time()cur.execute(stmt, params)row = cur.fetchone()elapsed_time = (time.time() - start_time) * 1000print(f"Execution time for checking existing questions:{elapsed_time:.2f} milliseconds")try:question2, answer, score = rowprint(f"\nClosest Matching row:\nQuestion: {question2}\nAnswer:{answer}\nSimilarity Score: {score}")if score >.97:print('Action to take: Using existing answer')return answerelse:print('Action to take: Running agent_executor')# Record the start timestart_time = time.time()answer2 = agent_executor.run(question)# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"agent_executor execution time: {elapsed_time:.2f}milliseconds")created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# Record the start timestart_time = time.time()answer_embedding = [np.array(x, '<f4') for x inget_embeddings([answer2], api_key=apikey, engine=model)]# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"Answer embeddings execution time:{elapsed_time:.2f} milliseconds")params = {'category': category, 'question': question,'question_embedding': question_embedding,'answer': answer2, 'answer_embedding':answer_embedding,'created_at': created_at}# Send to SingleStoreDBstmt = f"INSERT INTO {table_name} (category, question,question_embedding, answer, answer_embedding, created_at) VALUES(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,\n%(answer_embedding)s, \n%(created_at)s)"# Record the start timestart_time = time.time()with s2_conn.cursor() as cur:cur.execute(stmt, params)# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"Insert to SingleStore execution time:{elapsed_time:.2f} milliseconds")return answer2except:print('No existing rows. Running agent_executor')# Record the start timestart_time = time.time()answer2 = agent_executor.run(question)# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"agent_executor execution time: {elapsed_time:.2f}milliseconds")created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# Record the start timestart_time = time.time()answer_embedding = [np.array(x, '<f4') for x inget_embeddings([answer2], api_key=apikey, engine=model)]# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"Answer embeddings execution time: {elapsed_time:.2f}milliseconds")params = {'category': category, 'question': question,'question_embedding': question_embedding,'answer': answer2, 'answer_embedding':answer_embedding,'created_at': created_at}# Send to SingleStoreDBstmt = f"INSERT INTO {table_name} (category, question,question_embedding, answer, answer_embedding, created_at) VALUES(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,\n%(answer_embedding)s, \n%(created_at)s)"# Record the start timestart_time = time.time()with s2_conn.cursor() as cur:cur.execute(stmt, params)# Calculate the elapsed timeelapsed_time = (time.time() - start_time) * 1000print(f"Insert to SingleStore execution time:{elapsed_time:.2f} milliseconds")return answer2
Putting it to the test
Here are two questions that have nearly the same meaning:
question_1 = "describe the database"question_2 = "describe database"
Processing the first question will make a call to the model, since it hasn’t been asked before. This took 4.44 seconds to complete.
The next question is asking virtually the same thing, but the semantic cache is leveraged. This takes 286 milliseconds to complete, an improvement of over 15.5x!
Conclusion
In the rapidly evolving landscape of AI and LLMs, it’s extremely important to ensure that systems are fast, efficient and scalable. A semantic cache layer solves many challenges in production workloads. Combining this with SingleStoreDB — which is built on the same principles ± promotes a better developer and user experience, while improving operational efficiency and reducing costs associated with computational resources.