New

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

Notebook

SingleStore Notebooks

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

What you will learn in this notebook:

In this notebook we ingest data from from different sources like MySQL, MongoDB and S3 and perform efficient analysis using both NoSQL and SQL on multimodal data (tabular and JSON).

Highlights

  1. Setup CDC from MongoDB and MySQL in easy steps. Replicate data in real-time and ensure upto date information for analytics, eliminating the need for complex tooling for data movement

  2. Analyze data using both NoSQL and relational approaches, depending on your specific needs. Developers and data analytics who are familiar with different programming approaches like MongoDB query language and SQL can work together on the same database. Perform familiar SQL queries on your NoSQL data!

Ready to unlock real-time analytics and unified data access? Let's start!

In [1]:

pip install pymongo prettytable matplotlib --quiet

Create database for importing data from different sources

This example gets banking data from three different sources: ATM locations from S3, transaction data from MySQL and user profile details from MongoDB databases. Joins data from different sources to generate rich insights about the transactional activity across user profile and locations across the globe

In [2]:

%%sql
DROP DATABASE IF EXISTS BankingAnalytics;
CREATE DATABASE BankingAnalytics;

Action Required

Make sure to select 'BankingAnalytics' database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.

Setup CDC from MySQL

SingleStore allows you to ingest the data from mysql using pipelines

In this step, we create a link from MySQL instance and start the pipelines for the CDC

In [3]:

%%sql
CREATE LINK mysqllink AS MYSQL
CONFIG '{
"database.hostname": "3.141.19.255",
"database.exclude.list": "mysql,performance_schema",
"table.include.list": "DomainAnalytics.transactions",
"database.port": 3306,
"database.ssl.mode":"required"
}'
CREDENTIALS '{
"database.password": "Password@123",
"database.user": "repl_user"
}';

In [4]:

%%sql
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mysqllink "*" FORMAT AVRO;

In [5]:

%%sql
START ALL PIPELINES;

Migrate the data from S3 storage to SingleStore using Pipelines

This steps loads data from S3, this requires the tables to be defined beforehand

In [6]:

%%sql
CREATE TABLE IF NOT EXISTS atm_locations (
id INT PRIMARY KEY,
name VARCHAR(255),
address VARCHAR(255),
city VARCHAR(255),
country VARCHAR(255),
latitude DECIMAL(9, 6),
longitude DECIMAL(9, 6)
);

In [7]:

%%sql
CREATE PIPELINE atmlocations AS
LOAD DATA S3 's3://ocbfinalpoc1/data'
CONFIG '{"region":"ap-southeast-1"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE atm_locations;

In [8]:

%%sql
START PIPELINE atmlocations

Setup CDC from MongoDB to SingleStore

Now we setup CDC from MongoDB to replicate the data SingleStore

The collections to be replicated are specified as a comma separated or in a wildcard format in "collection.include.list"

In [9]:

%%sql
CREATE LINK mongo AS MONGODB
CONFIG '{
"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017",
"collection.include.list": "bank.*",
"mongodb.ssl.enabled":"true",
"mongodb.authsource":"admin",
"mongodb.members.auto.discover": "true"
}'
CREDENTIALS '{
"mongodb.user":"forimport",
"mongodb.password":"4Zfb0SKGCcDz5bBt"
}';

In [10]:

%%sql
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mongo '*' FORMAT AVRO;

In [11]:

%%sql
SHOW PIPELINES

In [12]:

%%sql
START ALL PIPELINES

Check for records in tables

Data from MySQL

In [13]:

%%sql
SELECT COUNT(*) FROM transactions

In [14]:

%%sql
SELECT * FROM transactions WHERE transaction_type LIKE '%Deposit%' LIMIT 1;

Data from S3

In [15]:

%%sql
SELECT COUNT(*) FROM atm_locations

In [16]:

%%sql
SELECT * FROM atm_locations LIMIT 1;

Data from MongoDB

In [17]:

%%sql
SELECT _id:>JSON, _more:>JSON FROM profile LIMIT 1;

In [18]:

%%sql
SELECT _id:>JSON, _more:>JSON FROM history LIMIT 1;

Join tables from different sources using SQL queries

SQL Query 1: View Users details, their associated ATMs

In [19]:

%%sql
SELECT
p._more::$full_name AS NameOfPerson,
p._more::$email AS Email,
a.id,
a.name AS ATMName,
a.city,
a.country
FROM
profile p,
atm_locations a
WHERE
p._more::$account_id = a.id
LIMIT 10;

SQL Query 2: View Users details, their associated ATMs and transaction details

In [20]:

%%sql
SELECT
p._more::$full_name AS NameOfPerson,
p._more::$email AS Email,
a.id,
a.name AS ATMName,
a.city,
t.transaction_id,
t.transaction_date,
t.amount,
t.transaction_type,
t.description
FROM
profile p
JOIN
atm_locations a ON p._more::$account_id = a.id
LEFT JOIN
transactions t ON p._more::$account_id = t.account_id
LIMIT 10;

Run queries in Mongo Query Language using Kai

In [21]:

from pymongo import MongoClient
import pprint
from prettytable import PrettyTable
client = MongoClient(connection_url_kai)
# Get the profile collection
db = client['BankingAnalytics']
profile_coll = db['profile']
for profile in profile_coll.find().limit(1):
pprint.pprint(profile)

In [22]:

pipeline = [
{
"$lookup": {
"from": "profile",
"localField": "account_id",
"foreignField": "account_id",
"as": "profile_data"
}
},
{
"$limit": 5
},
{
"$group": {
"_id": "$_id",
"history_data": {"$first": "$$ROOT"},
"profile_data": {"$first": {"$arrayElemAt": ["$profile_data", 0]}}
}
},
{
"$project": {
"_id": "$history_data._id",
"account_id": "$history_data.account_id",
"history_data": "$history_data",
"profile_data": "$profile_data"
}
}
]
# Execute the aggregation pipeline
result = list(db.history.aggregate(pipeline))
# Print the result in a tabular format
table = PrettyTable(["Account ID", "Full Name", "Date of Birth", "City", "State", "Country", "Postal Code", "Phone Number", "Email"])
for doc in result:
profile_data = doc["profile_data"]
table.add_row([
doc["account_id"],
profile_data.get("full_name", ""),
profile_data.get("date_of_birth", ""),
profile_data.get("city", ""),
profile_data.get("state", ""),
profile_data.get("country", ""),
profile_data.get("postal_code", ""),
profile_data.get("phone_number", ""),
profile_data.get("email", "")
])
print(table)

In [23]:

# Get the state with highest number of customers
from bson.son import SON
pipeline = [
{"$group": {"_id": "$state", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1), ("_id", -1)])},
{"$limit": 5}
]
pprint.pprint(list(profile_coll.aggregate(pipeline)))

In [24]:

import matplotlib.pyplot as plt
data = list(profile_coll.aggregate(pipeline))
print(data)
country,count = [dcts['_id'] for dcts in data],[dcts['count'] for dcts in data]
plt.bar(country,count)
plt.plot()

With SingleStore Kai you can power analytics on SQL and NoSQL data using the API of your choice

Details

Tags

#cdc#mongo#sql#nosql#kai

License

This Notebook has been released under the Apache 2.0 open source license.