in Product


Building Query Lambdas With SingleStore's TVF and Data API, Part II

In part I of this blog, we demonstrated how to create parameterized queries (similar to Rockset's Query Lambdas) using SingleStore's Table-Valued Functions (TVF) and the Data API.

Building Query Lambdas With SingleStore's TVF and Data API, Part II

In part II, we’ll show you how to schedule these API calls to replicate Rockset’s scheduled query lambdas functionality via SingleStore’s Job Service.

Step 1. Setting up SingleStore

We’ll run through similar steps outlined in part I to set up a demo database. However, in this example, our TVF is going to query the users table to get the total user signups from the previous hour. This result will then be written to the hourly_user_signups table.

Create a SingleStore database

CREATE DATABASE my_database;
USE my_database;

Create tables in SingleStore

CREATE TABLE users (
user_id INT PRIMARY KEY,
username VARCHAR(255),
email VARCHAR(255),
created_at TIMESTAMP
);
CREATE TABLE hourly_user_signups (
signup_datetime TIMESTAMP,
total_signups INT
);

Insert sample data

INSERT INTO users (user_id, username, email, created_at) VALUES
(1, 'john_doe', 'john@example.com', NOW()),
(2, 'jane_doe', 'jane@example.com', NOW());

Create a TVF to query the user table

CREATE FUNCTION get_hourly_signups()
RETURNS TABLE AS RETURN
SELECT DATE_TRUNC('hour', created_at) AS signup_datetime
, COUNT(*) AS total_signups
FROM users
WHERE created_at >= DATE_TRUNC('hour', NOW()) - INTERVAL 1 HOUR
AND created_at < DATE_TRUNC('hour', NOW());

Step 2. Create a SingleStore Notebook to perform your desired task

SingleStore Notebooks are hosted within SingleStore’s compute service, allowing you to execute both Python and SQL code.

For our use case, we’re going to create a notebook — which uses the TVF defined in step one — and takes the result set from that to update the hourly_user_signups table with the total signups from the previous hour. In step three, we’ll use the Job Service to run this notebook on a time-based schedule to repeatedly update the hourly_user_signups table with the most recent user signup counts.

We’ll run through each of the code blocks in the notebook (appendix one contains a screenshot of the notebook). First we’ll import the required Python libraries and create a connection to SingleStore via our native Python connector.

# import required libraries
import requests
import json
import singlestoredb as s2
# Create a SQLAlchemy engine and connect, without having to specify the connection URL
conn = s2.connect()
cur = conn.cursor()

The next step is to execute the TVF via SingleStore’s Data API. This will return a Python dictionary containing the total signups from the users table in the previous hour.

# Run TVF
credentials = ('admin', '<insert_pw>')
host = '<insert_host_url>'
api_endpoint = '/api/v2/query/rows'
url = 'https://' + host + api_endpoint
# Query + database
query = "SELECT * FROM get_hourly_signups()"
sql_statement = {
'sql': query,
'database':'my_database'
}
resp = requests.post(url,json=sql_statement,auth=credentials)
resp_dict = dict(resp.json()['results'][0]['rows'][0])

Finally, we’ll insert the results from the data API call to update the hourly_users_signups table.

# write results to hourly_user_signups
insert_command = """INSERT INTO hourly_user_signups VALUES ('{}',
{});""".format(resp_dict['signup_datetime'], resp_dict['total_signups'])
cur.execute(insert_command)
conn.close()

We’ve used the data API along with a TVF here to follow along from part I of this blog. However, an easier way to complete this task would be to just use a SQL cell in the notebook and perform an INSERT INTO SELECT query.

Step 3. Use SingleStore’s Job Service to schedule running the notebook

Now that we have built the notebook to execute our task, we just need to schedule it to continuously update the hourly_user_signups table with the most recent aggregate data. This can be done via SingleStore’s Job Service which allows you to run your notebooks on a time-based schedule.

Jobs can be created in the SingleStore UI. On the left-hand panel, click the “Jobs” button, then click on “Schedule”. From there the following menu will pop up, and you can select:

  • The notebook you want to schedule a job for
  • The frequency at which you want to run the notebook
  • The workspace and database to run the job against

In our example, we have selected the job to run once an hour at five minutes past the top of the hour. This allows the TVF to pull all user signups from the previous hour, inserting the aggregated result into the hourly_user_signups table.

In this blog, we have used SingleStore’s Job Service to create an aggregated view of the users table. Some other common use cases for this SingleStore Job Service functionality are:

  • Data preparation and ML flows on your data stored in SingleStore
  • Performing transforms and caching results in your SingleStore database
  • Sending automated alerts
  • Ingesting data from various sources into SingleStore
  • Create and manage workspace groups and workspaces via the Management API

Part II of this blog demonstrates how you can schedule repetitive tasks to achieve similar functionality to Rockset’s Scheduled Query lambdas. It is important to note that any SQL or Python code can be executed within SingleStore notebooks — and users are not limited to using the Data API in conjunction with TVFs to create scheduled jobs.

If you’re looking for a Rockset alternative (or any database) and ready to start your migration, feel free to schedule time with a member of the SingleStore team here.

Appendix


Share