Real Time Recommendation Engine
Notebook
Note
This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.
How to build a real-time recommendation engine with SingleStore & Vercel
We will demonstrate how to build a modern real-time AI application for free using a Shared Tier Database, SingleStore Notebooks, and Job Service.
A Free SingleStore Starter Workspace enables you to execute hybrid search, real-time analytics, and point read/writes/updates in a single database. With SingleStore Notebooks and our Job Service, you easily bring in data from various sources (APIs, MySQL / Mongo endpoints) in real-time. You can also execute Python-based transforms, such as adding embeddings, ensuring that real-time data is readily available for your downstream LLMs and applications.
We will showcase the seamless transition from a prototype to an end-application using SingleStore. The final application will be hosted on Vercel. You can see the App we've built following this notebook here
Architecture:
Scenario:
Building a recommendation engine on what LLM you should be using for your use-case. Bringing together semantic search + real-time analytics on the performance of the LLM to make the recommendations.
Here are the requirements we've set out for this recommendation engine:
Pull data from Hugging Face Leaderboard on various Open source LLM models and their scores. Pull updated scores on these models every hour.
For each of these models, pull data from Twitter and Github on what developers are saying about these models, and how they are being used in active projects. Pull this data every hour.
Provide an easy 'search' interface to users where they can describe their use-case. When users provide describe their use-case, perform a hybrid search (vector + full-text search) across the descriptions of these models, what users are saying about it on Twitter, and which github repos are using these LLMs.
Combine the results of the semantic search with analytics on the public benchmarks, # likes, # downloads of these models.
Power the app entirely on a single SingleStore Free Shared Tier Workspace.
Ensure that all of the latest posts / scores are reflected in the App. Power this entirely with SingleStore Notebook and Job Service
Contents
Step 1: Creating a Starter Workspace
Step 2: Installing & Importing required libraries
Step 3: Setting Key Variables
Step 4: Designing your table scheama on SingleStore
Step 5: Creating Helper Functions to load data into SingleStore
Step 6: Loading data with embeddings into SingleStore
Step 7: Building the Recommendation Engine Algorithm on Vercel
Step 1. Create a Starter Workspace
Create a new Workpsace Group and select a Starter Workspace. If you do not have this enabled email pm@singlestore.com
Step 2. Install and import required libraries
In [1]:
%pip install singlestoredb openai tiktoken beautifulsoup4 pandas python-dotenv Markdown praw tweepy --quietimport reimport jsonimport openaiimport tiktokenimport jsonimport requestsimport getpassimport pandas as pdimport singlestoredb as s2import tweepyimport prawfrom bs4 import BeautifulSoupfrom markdown import markdownfrom datetime import datetimefrom time import time, sleep
Step 3. Seting Environment variables
3.1. Set the app common variables. Do not change these
In [2]:
MODELS_LIMIT = 100MODELS_TABLE_NAME = 'models'MODEL_READMES_TABLE_NAME = 'model_readmes'MODEL_TWITTER_POSTS_TABLE_NAME = 'model_twitter_posts'MODEL_REDDIT_POSTS_TABLE_NAME = 'model_reddit_posts'MODEL_GITHUB_REPOS_TABLE_NAME = 'model_github_repos'LEADERBOARD_DATASET_URL = 'https://llm-recommender.vercel.app/datasets/leaderboard.json'TOKENS_LIMIT = 2047TOKENS_TRASHHOLD_LIMIT = TOKENS_LIMIT - 128
3.2. Set the OpenAI variables
We will be using OpenAI's embedding models to create vectors representing our data. The vectors will be stored in the SingleStore Starter Workspace as a column in the relevant tables.
Using OpenAI's LLMs we will also generate output text after we complete the Retrieval Augmentation Generation Steps.
Create a new key
Copy the key and paste it into the
OPENAI_API_KEY
variable
In [3]:
OPENAI_API_KEY = getpass.getpass("enter openAI apikey here")
3.3. Set the HuggingFace variables
We will be pulling data from HugginFace about the different models, the usage of these models, and how they score in several evaluation metrics.
Create a new token
Copy the key and paste it into the
HF_TOKEN
variable
In [4]:
HF_TOKEN = getpass.getpass("enter HuggingFace apikey here")
3.4. Set the Twitter variables
We will be pulling data from Twitter about what users might be saying about these models. Since teh quality of these models may change over time, we want to caputre the sentiment of what people are talking about and using on twitter.
Add a new app
Fill the form
Generate a Bearer Token and paste it into the
TWITTER_BEARER_TOKEN
variable
In [5]:
TWITTER_BEARER_TOKEN = getpass.getpass("enter Twitter Bearer Token here")
3.5 Set the GitHub variables
We will also be pulling data from various Github repos on which models are being referenced and used for which scenarios.
Fill the form
Get an access token and paste it into the
GITHUB_ACCESS_TOKEN
variable
In [6]:
GITHUB_ACCESS_TOKEN = getpass.getpass("enter Github Access Token here")
Step 4. Designing and creating your table schemas in SingleStore
We will be storing all of this data in a single Free Shared Tier Database. Through this database, you can write hybrid search queries, run analytics on the model's performance, and get real-time reads/updates.
connection
- database connection to execute queriescreate_tables
- function that creates empty tables in the databasedrop_table
- helper function to drop a tableget_models
- helper function to get models from the models tabledb_get_last_created_at
- helper function to get lastcreated_at
value from a table
The create_tables
creates the following tables:
models_table
- table with all models data from the Open LLM Leaderboardreadmes_table
- table with model readme texts from the HugginFace model pages (used in semantic search)twitter_posts
- table with tweets related to models (used in semantic search)github_repos
- table with GitHub readme texts related to models (used in semantic search)
Action Required
Make sure to select a database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.
In [7]:
connection = s2.connect(connection_url)def create_tables():def create_models_table():with connection.cursor() as cursor:cursor.execute(f'''CREATE TABLE IF NOT EXISTS {MODELS_TABLE_NAME} (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(512) NOT NULL,author VARCHAR(512) NOT NULL,repo_id VARCHAR(1024) NOT NULL,score DECIMAL(5, 2) NOT NULL,arc DECIMAL(5, 2) NOT NULL,hellaswag DECIMAL(5, 2) NOT NULL,mmlu DECIMAL(5, 2) NOT NULL,truthfulqa DECIMAL(5, 2) NOT NULL,winogrande DECIMAL(5, 2) NOT NULL,gsm8k DECIMAL(5, 2) NOT NULL,link VARCHAR(255) NOT NULL,downloads INT,likes INT,still_on_hub BOOLEAN NOT NULL,created_at TIMESTAMP,embedding BLOB)''')def create_model_readmes_table():with connection.cursor() as cursor:cursor.execute(f'''CREATE TABLE IF NOT EXISTS {MODEL_READMES_TABLE_NAME} (id INT AUTO_INCREMENT PRIMARY KEY,model_repo_id VARCHAR(512),text LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,clean_text LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,created_at TIMESTAMP,embedding BLOB)''')def create_model_twitter_posts_table():with connection.cursor() as cursor:cursor.execute(f'''CREATE TABLE IF NOT EXISTS {MODEL_TWITTER_POSTS_TABLE_NAME} (id INT AUTO_INCREMENT PRIMARY KEY,model_repo_id VARCHAR(512),post_id VARCHAR(256),clean_text LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,created_at TIMESTAMP,embedding BLOB)''')def create_model_github_repos_table():with connection.cursor() as cursor:cursor.execute(f'''CREATE TABLE IF NOT EXISTS {MODEL_GITHUB_REPOS_TABLE_NAME} (id INT AUTO_INCREMENT PRIMARY KEY,model_repo_id VARCHAR(512),repo_id INT,name VARCHAR(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,description TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,clean_text LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,link VARCHAR(256),created_at TIMESTAMP,embedding BLOB)''')create_models_table()create_model_readmes_table()create_model_twitter_posts_table()create_model_github_repos_table()def drop_table(table_name: str):with connection.cursor() as cursor:cursor.execute(f'DROP TABLE IF EXISTS {table_name}')def get_models(select='*', query='', as_dict=True):with connection.cursor() as cursor:_query = f'SELECT {select} FROM {MODELS_TABLE_NAME}'if query:_query += f' {query}'cursor.execute(_query)if as_dict:columns = [desc[0] for desc in cursor.description]return [dict(zip(columns, row)) for row in cursor.fetchall()]return cursor.fetchall()def db_get_last_created_at(table, repo_id, to_string=False):with connection.cursor() as cursor:cursor.execute(f"""SELECT UNIX_TIMESTAMP(created_at) FROM {table}WHERE model_repo_id = '{repo_id}'ORDER BY created_at DESCLIMIT 1""")rows = cursor.fetchone()created_at = float(rows[0]) if rows and rows[0] else Noneif (created_at and to_string):created_at = datetime.fromtimestamp(created_at)created_at = created_at.strftime('%Y-%m-%dT%H:%M:%SZ')return created_at
Step 5. Creating helper functions to load data into SingleStore
5.1. Setting up the openai.api_key
In [8]:
openai.api_key = OPENAI_API_KEY
5.2. Create the create_embeddings
function
This function will be used to create embeddings on data based on an input to the function. We will be doing this to all data pulled from Github, HuggingFace and Twitter. The vector embeddings created will be stored in the same SingleStore table as a separate column.
In [9]:
def count_tokens(text: str):enc = tiktoken.get_encoding('cl100k_base')return len(enc.encode(text, disallowed_special={}))def create_embedding(input):try:data = openai.embeddings.create(input=input, model='text-embedding-ada-002').datareturn data[0].embeddingexcept Exception as e:print(e)return [[]]
5.3. Create the function/Utils to help parse the data ingested from the various sources
This is a set of functions that ensure the JSON is in the right format and can be stored in SingleStore as a JSON column. In your Free Shared Tier workspace you can bring data of various formats (JSON, Geospatial, Vector) and interact with this data with SQL and MongoDB API.
In [10]:
class JSONEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.strftime('%Y-%m-%d %H:%M:%S')return super().default(obj)def list_into_chunks(lst, chunk_size=100):return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]def string_into_chunks(string: str, max_tokens=TOKENS_LIMIT):if count_tokens(string) <= max_tokens:return [string]delimiter = ' 'words = string.split(delimiter)chunks = []current_chunk = []for word in words:if count_tokens(delimiter.join(current_chunk + [word])) <= max_tokens:current_chunk.append(word)else:chunks.append(delimiter.join(current_chunk))current_chunk = [word]if current_chunk:chunks.append(delimiter.join(current_chunk))return chunksdef clean_string(string: str):def strip_html_elements(string: str):html = markdown(string)soup = BeautifulSoup(html, "html.parser")text = soup.get_text()return text.strip()def remove_unicode_escapes(string: str):return re.sub(r'[^\x00-\x7F]+', '', string)def remove_string_spaces(strgin: str):new_string = re.sub(r'\n+', '\n', strgin)new_string = re.sub(r'\s+', ' ', new_string)return new_stringdef remove_links(string: str):url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'return re.sub(url_pattern, '', string)new_string = strip_html_elements(string)new_string = remove_unicode_escapes(new_string)new_string = remove_string_spaces(new_string)new_string = re.sub(r'\*\*+', '*', new_string)new_string = re.sub(r'--+', '-', new_string)new_string = re.sub(r'====+', '=', new_string)new_string = remove_links(new_string)return new_string
Step 6. Loading Data into SingleStore
6.1. Load Data on all Open-Source LLM models from HuggingFace Leaderboard
This function loads a pre-generated Open LLM Leaderboard dataset. Based on this dataset, all model data is created and inserted into the database. We will also create embeddings for all of this data pulled using the OpenAI Embedding Model.
In [11]:
def leaderboard_get_df():response = requests.get(LEADERBOARD_DATASET_URL)if response.status_code == 200:data = json.loads(response.text)df = pd.DataFrame(data).head(MODELS_LIMIT)return dfelse:print("Failed to retrieve JSON file")def leaderboard_insert_model(model):try:_model = {key: value for key, value in model.items() if key != 'readme'}to_embedding = json.dumps(_model, cls=JSONEncoder)embedding = str(create_embedding(to_embedding))model_to_insert = {**_model, embedding: embedding}readmes_to_insert = []if model['readme']:readme = {'model_repo_id': model['repo_id'],'text': model['readme'],'created_at': time()}if count_tokens(readme['text']) <= TOKENS_TRASHHOLD_LIMIT:readme['clean_text'] = clean_string(readme['text'])to_embedding = json.dumps({'model_repo_id': readme['model_repo_id'],'clean_text': readme['clean_text'],})readme['embedding'] = str(create_embedding(to_embedding))readmes_to_insert.append(readme)else:for i, chunk in enumerate(string_into_chunks(readme['text'])):_readme = {**readme,'text': chunk,'created_at': time()}_readme['clean_text'] = clean_string(chunk)to_embedding = json.dumps({'model_repo_id': _readme['model_repo_id'],'clean_text': chunk,})_readme['embedding'] = str(create_embedding(to_embedding))readmes_to_insert.append(_readme)with connection.cursor() as cursor:cursor.execute(f'''INSERT INTO {MODELS_TABLE_NAME} (name, author, repo_id, score, link, still_on_hub, arc, hellaswag, mmlu, truthfulqa, winogrande, gsm8k, downloads, likes, created_at, embedding)VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FROM_UNIXTIME(%s), JSON_ARRAY_PACK(%s))''', tuple(model_to_insert.values()))for chunk in list_into_chunks([tuple(readme.values()) for readme in readmes_to_insert]):with connection.cursor() as cursor:cursor.executemany(f'''INSERT INTO {MODEL_READMES_TABLE_NAME} (model_repo_id, text, created_at, clean_text, embedding)VALUES (%s, %s, FROM_UNIXTIME(%s), %s, JSON_ARRAY_PACK(%s))''', chunk)except Exception as e:print('Error leaderboard_insert_model: ', e)def leaderboard_process_models():print('Processing models')existed_model_repo_ids = [i[0] for i in get_models('repo_id', as_dict=False)]leaderboard_df = leaderboard_get_df()for i, row in leaderboard_df.iterrows():if not row['repo_id'] in existed_model_repo_ids:leaderboard_insert_model(row.to_dict())
6.2 Loading Data from Github about model usage
We will search the Github API by keyword based on the model names we have above to find their usage across repos. We will then pull data from the ReadME's of the repos that reference a particular model and create an embedding for it.
This allows us to see in which kinds of scenarios are developers using a particular LLM and incoporate it as a part of our recommendation.
In the first step we search for the model using the github API
In [12]:
def github_search_repos(keyword: str, last_created_at):repos = []headers = {'Authorization': f'token {GITHUB_ACCESS_TOKEN}'}query = f'"{keyword}" in:name,description,readme'if last_created_at:query += f' created:>{last_created_at}'try:repos_response = requests.get("https://api.github.com/search/repositories",headers=headers,params={'q': query})if repos_response.status_code == 403:# Handle rate limitingrate_limit = repos_response.headers['X-RateLimit-Reset']if not rate_limit:return repossleep_time = int(rate_limit) - int(time())if sleep_time > 0:print(f"Rate limit exceeded. Retrying in {sleep_time} seconds.")sleep(sleep_time)return github_search_repos(keyword, last_created_at)if repos_response.status_code != 200:return reposfor repo in repos_response.json().get('items', []):try:readme_response = requests.get(repo['contents_url'].replace('{+path}', 'README.md'), headers=headers)if repos_response.status_code != 200:continuereadme_file = readme_response.json()if readme_file['size'] > 7000:continuereadme_text = requests.get(readme_file['download_url']).textif not readme_text:continuerepos.append({'repo_id': repo['id'],'name': repo['name'],'link': repo['html_url'],'created_at': datetime.strptime(repo['created_at'], '%Y-%m-%dT%H:%M:%SZ').timestamp(),'description': repo.get('description', ''),'readme': readme_text,})except:continueexcept:return reposreturn repos
After we conduct this serach, we will insert it into another table in the database. The data inserted will have embeddings associated with it.
In [13]:
def github_insert_model_repos(model_repo_id, repos):for repo in repos:try:values = []value = {'model_repo_id': model_repo_id,'repo_id': repo['repo_id'],'name': repo['name'],'description': repo['description'],'clean_text': clean_string(repo['readme']),'link': repo['link'],'created_at': repo['created_at'],}to_embedding = {'model_repo_id': model_repo_id,'name': value['name'],'description': value['description'],'clean_text': value['clean_text']}if count_tokens(value['clean_text']) <= TOKENS_TRASHHOLD_LIMIT:embedding = str(create_embedding(json.dumps(to_embedding)))values.append({**value, 'embedding': embedding})else:for chunk in string_into_chunks(value['clean_text']):embedding = str(create_embedding(json.dumps({**to_embedding,'clean_text': chunk})))values.append({**value, 'clean_text': chunk, 'embedding': embedding})for chunk in list_into_chunks([list(value.values()) for value in values]):with connection.cursor() as cursor:cursor.executemany(f'''INSERT INTO {MODEL_GITHUB_REPOS_TABLE_NAME} (model_repo_id, repo_id, name, description, clean_text, link, created_at, embedding)VALUES (%s, %s, %s, %s, %s, %s, FROM_UNIXTIME(%s), JSON_ARRAY_PACK(%s))''', chunk)except Exception as e:print('Error github_insert_model_repos: ', e)def github_process_models_repos(existed_models):print('Processing GitHub posts')for model in existed_models:try:repo_id = model['repo_id']last_created_at = db_get_last_created_at(MODEL_GITHUB_REPOS_TABLE_NAME, repo_id, True)keyword = model['name'] if re.search(r'\d', model['name']) else repo_idfound_repos = github_search_repos(keyword, last_created_at)if len(found_repos):github_insert_model_repos(repo_id, found_repos)except Exception as e:print('Error github_process_models_repos: ', e)
6.3. Load Data from Twitter about these models.
First, we will search Twitter based on the model names we have using the API.
In [14]:
twitter = tweepy.Client(TWITTER_BEARER_TOKEN)def twitter_search_posts(keyword, last_created_at):posts = []try:tweets = twitter.search_recent_tweets(query=f'{keyword} -is:retweet',tweet_fields=['id', 'text', 'created_at'],start_time=last_created_at,max_results=100)for tweet in tweets.data:posts.append({'post_id': tweet.id,'text': tweet.text,'created_at': tweet.created_at,})except Exception:return postsreturn posts
Next, we will add the text from the posts per model into another table. This table will also have embeddings associated with it.
In [15]:
def twitter_insert_model_posts(model_repo_id, posts):for post in posts:try:values = []value = {'model_repo_id': model_repo_id,'post_id': post['post_id'],'clean_text': clean_string(post['text']),'created_at': post['created_at'],}to_embedding = {'model_repo_id': value['model_repo_id'],'clean_text': value['clean_text']}embedding = str(create_embedding(json.dumps(to_embedding)))values.append({**value, 'embedding': embedding})for chunk in list_into_chunks([list(value.values()) for value in values]):with connection.cursor() as cursor:cursor.executemany(f'''INSERT INTO {MODEL_TWITTER_POSTS_TABLE_NAME} (model_repo_id, post_id, clean_text, created_at, embedding)VALUES (%s, %s, %s, %s, JSON_ARRAY_PACK(%s))''', chunk)except Exception as e:print('Error twitter_insert_model_posts: ', e)def twitter_process_models_posts(existed_models):print('Processing Twitter posts')for model in existed_models:try:repo_id = model['repo_id']last_created_at = db_get_last_created_at(MODEL_TWITTER_POSTS_TABLE_NAME, repo_id, True)keyword = model['name'] if re.search(r'\d', model['name']) else repo_idfound_posts = twitter_search_posts(keyword, last_created_at)if len(found_posts):twitter_insert_model_posts(repo_id, found_posts)except Exception as e:print('Error twitter_process_models_posts: ', e)
6.4. Run the functions we've created above to load the data into SingleStore
First, the notebook creates tables in the database if they don't exist.
Next, the notebook retrieves the specified number of models from the Open LLM Leaderboard dataset, creates embeddings, and enters the data into the models
and model_reamdes
tables.
Next, it executes a query to retrieve all the models in the database. Based on these models, Twitter posts, Reddit posts, and GitHub repositories are searched, converted into embeddings and inserted into tables.
Finally, we get a ready set of data for finding the most appropriate model for any use case using semantic search.
In [16]:
create_tables()leaderboard_process_models()existed_models = get_models('repo_id, name', f'ORDER BY score DESC LIMIT {MODELS_LIMIT}')twitter_process_models_posts(existed_models)github_process_models_repos(existed_models)
(Optional) 6.5 Run this notebook every hour using our built-in Job Service
By scheduling this notebook to run every hour, the latest data from Hugging Face will be pulled on new models, their scores and their likes/downloads. This will ensure that you can capture the latest sentiment and usage from Twitter / Github about developers.
SingleStore Notebook + Job Service makes it really easy to bring real-time data to your vector-based searches and AI/ML models downstream. You can ensure that the data is in the right format and apply python based transformations like creating embeddings on the most newly ingested data. This would've previously required a combination of several serverless technologies alongside your database as we wrote about this previously
(Optional) Step 7: Host the app with Vercel
Follow our github repo where we showcase how to write the front end code of the app which does the vector similarity search to provide the results.The front end is built with our elegance SDK and hosted with Vercel.
See our guide on our vercel integration with SingleStore. We have a public version of the app running for free here.
Details
About this Template
We demonstrate how to build and host a real-time recommendation engine for free with SingleStore. The notebook also leverages our new SingleStore Job Service to ensure that the latest data is ingested and used in providing recommendations.
This Notebook can be run in Shared Tier, Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.