
Unified Data Analysis: SQL & NoSQL on a Single Database with Kai
Notebook

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

What you will learn in this notebook:
In this notebook we ingest data from from different sources like MySQL, MongoDB and S3 and perform efficient analysis using both NoSQL and SQL on multimodal data (tabular and JSON).
Highlights
Setup CDC from MongoDB and MySQL in easy steps. Replicate data in real-time and ensure upto date information for analytics, eliminating the need for complex tooling for data movement
Analyze data using both NoSQL and relational approaches, depending on your specific needs. Developers and data analytics who are familiar with different programming approaches like MongoDB query language and SQL can work together on the same database. Perform familiar SQL queries on your NoSQL data!
Ready to unlock real-time analytics and unified data access? Let's start!
In [1]:
1
!pip install pymongo prettytable matplotlib --quiet
Create database for importing data from different sources
This example gets banking data from three different sources: ATM locations from S3, transaction data from MySQL and user profile details from MongoDB databases. Joins data from different sources to generate rich insights about the transactional activity across user profile and locations across the globe
In [2]:
1
%%sql2
DROP DATABASE IF EXISTS BankingAnalytics;3
CREATE DATABASE BankingAnalytics;
Action Required
Make sure to select 'BankingAnalytics' database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.

Setup CDC from MySQL
SingleStore allows you to ingest the data from mysql using pipelines
In this step, we create a link from MySQL instance and start the pipelines for the CDC
In [3]:
1
%%sql2
CREATE LINK mysqllink AS MYSQL3
CONFIG '{4
"database.hostname": "3.132.226.181",5
"database.exclude.list": "mysql,performance_schema",6
"table.include.list": "DomainAnalytics.transactions",7
"database.port": 3306,8
"database.ssl.mode":"required"9
}'10
CREDENTIALS '{11
"database.password": "Password@123",12
"database.user": "repl_user"13
}';
In [4]:
1
%%sql2
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mysqllink "*" FORMAT AVRO;
In [5]:
1
%%sql2
START ALL PIPELINES;
Migrate the data from S3 storage to SingleStore using Pipelines
This steps loads data from S3, this requires the tables to be defined beforehand
In [6]:
1
%%sql2
CREATE TABLE IF NOT EXISTS atm_locations (3
id INT PRIMARY KEY,4
name VARCHAR(255),5
address VARCHAR(255),6
city VARCHAR(255),7
country VARCHAR(255),8
latitude DECIMAL(9, 6),9
longitude DECIMAL(9, 6)10
);
In [7]:
1
%%sql2
CREATE PIPELINE atmlocations AS3
LOAD DATA S3 's3://ocbfinalpoc1/data'4
CONFIG '{"region":"ap-southeast-1"}'5
SKIP DUPLICATE KEY ERRORS6
INTO TABLE atm_locations;
In [8]:
1
%%sql2
START PIPELINE atmlocations
Setup CDC from MongoDB to SingleStore
Now we setup CDC from MongoDB to replicate the data SingleStore
The collections to be replicated are specified as a comma separated or in a wildcard format in "collection.include.list"
In [9]:
1
%%sql2
CREATE LINK mongo AS MONGODB3
CONFIG '{4
"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017",5
"collection.include.list": "bank.*",6
"mongodb.ssl.enabled":"true",7
"mongodb.authsource":"admin",8
"mongodb.members.auto.discover": "true"9
}'10
CREDENTIALS '{11
"mongodb.user":"mongo_sample_reader",12
"mongodb.password":"SingleStoreRocks27017"13
}';
In [10]:
1
%%sql2
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mongo '*' FORMAT AVRO;
In [11]:
1
%%sql2
SHOW PIPELINES
In [12]:
1
%%sql2
START ALL PIPELINES
Check for records in tables
Data from MySQL
In [13]:
1
%%sql2
SELECT COUNT(*) FROM transactions
In [14]:
1
%%sql2
SELECT * FROM transactions WHERE transaction_type LIKE '%Deposit%' LIMIT 1;
Data from S3
In [15]:
1
%%sql2
SELECT COUNT(*) FROM atm_locations
In [16]:
1
%%sql2
SELECT * FROM atm_locations LIMIT 1;
Data from MongoDB
In [17]:
1
%%sql2
SELECT _id:>JSON, _more:>JSON FROM profile LIMIT 1;
In [18]:
1
%%sql2
3
SELECT _id:>JSON, _more:>JSON FROM history LIMIT 1;
Join tables from different sources using SQL queries
SQL Query 1: View Users details, their associated ATMs
In [19]:
1
%%sql2
SELECT3
p._more::$full_name AS NameOfPerson,4
p._more::$email AS Email,5
a.id,6
a.name AS ATMName,7
a.city,8
a.country9
FROM10
profile p,11
atm_locations a12
WHERE13
p._more::$account_id = a.id14
LIMIT 10;
SQL Query 2: View Users details, their associated ATMs and transaction details
In [20]:
1
%%sql2
SELECT3
p._more::$full_name AS NameOfPerson,4
p._more::$email AS Email,5
a.id,6
a.name AS ATMName,7
a.city,8
t.transaction_id,9
t.transaction_date,10
t.amount,11
t.transaction_type,12
t.description13
FROM14
profile p15
JOIN16
atm_locations a ON p._more::$account_id = a.id17
LEFT JOIN18
transactions t ON p._more::$account_id = t.account_id19
LIMIT 10;
Run queries in Mongo Query Language using Kai
In [21]:
1
from pymongo import MongoClient2
import pprint3
from prettytable import PrettyTable4
5
client = MongoClient(connection_url_kai)6
7
# Get the profile collection8
db = client['BankingAnalytics']9
profile_coll = db['profile']10
11
for profile in profile_coll.find().limit(1):12
pprint.pprint(profile)
In [22]:
1
pipeline = [2
{3
"$lookup": {4
"from": "profile",5
"localField": "account_id",6
"foreignField": "account_id",7
"as": "profile_data"8
}9
},10
{11
"$limit": 512
},13
{14
"$group": {15
"_id": "$_id",16
"history_data": {"$first": "$$ROOT"},17
"profile_data": {"$first": {"$arrayElemAt": ["$profile_data", 0]}}18
}19
},20
{21
"$project": {22
"_id": "$history_data._id",23
"account_id": "$history_data.account_id",24
"history_data": "$history_data",25
"profile_data": "$profile_data"26
}27
}28
]29
30
# Execute the aggregation pipeline31
result = list(db.history.aggregate(pipeline))32
33
# Print the result in a tabular format34
table = PrettyTable(["Account ID", "Full Name", "Date of Birth", "City", "State", "Country", "Postal Code", "Phone Number", "Email"])35
for doc in result:36
profile_data = doc["profile_data"]37
table.add_row([38
doc["account_id"],39
profile_data.get("full_name", ""),40
profile_data.get("date_of_birth", ""),41
profile_data.get("city", ""),42
profile_data.get("state", ""),43
profile_data.get("country", ""),44
profile_data.get("postal_code", ""),45
profile_data.get("phone_number", ""),46
profile_data.get("email", "")47
])48
49
print(table)
In [23]:
1
# Get the state with highest number of customers2
from bson.son import SON3
4
pipeline = [5
{"$group": {"_id": "$state", "count": {"$sum": 1}}},6
{"$sort": SON([("count", -1), ("_id", -1)])},7
{"$limit": 5}8
]9
10
pprint.pprint(list(profile_coll.aggregate(pipeline)))
In [24]:
1
import matplotlib.pyplot as plt2
3
data = list(profile_coll.aggregate(pipeline))4
5
print(data)6
7
country,count = [dcts['_id'] for dcts in data],[dcts['count'] for dcts in data]8
9
plt.bar(country,count)10
plt.plot()
With SingleStore Kai you can power analytics on SQL and NoSQL data using the API of your choice

Details
About this Template
Perform both SQL and NoSQL queries on multi-modal relational and JSON data
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.