Unified Data Analysis: SQL & NoSQL on a Single Database with Kai
Notebook
Unified Data Analysis: SQL & NoSQL on a Single Database with Kai
What you will learn in this notebook:
In this notebook we ingest data from from different sources like MySQL, MongoDB and S3 and perform efficient analysis using both NoSQL and SQL on multimodal data (tabular and JSON).
Highlights
Setup CDC from MongoDB and MySQL in easy steps. Replicate data in real-time and ensure upto date information for analytics, eliminating the need for complex tooling for data movement
Analyze data using both NoSQL and relational approaches, depending on your specific needs. Developers and data analytics who are familiar with different programming approaches like MongoDB query language and SQL can work together on the same database. Perform familiar SQL queries on your NoSQL data!
Ready to unlock real-time analytics and unified data access? Let's start!
In [1]:
!pip install pymongo prettytable matplotlib --quiet
Create database for importing data from different sources
This example gets banking data from three different sources: ATM locations from S3, transaction data from MySQL and user profile details from MongoDB databases. Joins data from different sources to generate rich insights about the transactional activity across user profile and locations across the globe
In [2]:
%%sqlDROP DATABASE IF EXISTS BankingAnalytics;CREATE DATABASE BankingAnalytics;
Action Required
Make sure to select 'BankingAnalytics' database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.
Setup CDC from MySQL
SingleStore allows you to ingest the data from mysql using pipelines
In this step, we create a link from MySQL instance and start the pipelines for the CDC
In [3]:
%%sqlCREATE LINK mysqllink AS MYSQLCONFIG '{"database.hostname": "3.132.226.181","database.exclude.list": "mysql,performance_schema","table.include.list": "DomainAnalytics.transactions","database.port": 3306,"database.ssl.mode":"required"}'CREDENTIALS '{"database.password": "Password@123","database.user": "repl_user"}';
In [4]:
%%sqlCREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mysqllink "*" FORMAT AVRO;
In [5]:
%%sqlSTART ALL PIPELINES;
Migrate the data from S3 storage to SingleStore using Pipelines
This steps loads data from S3, this requires the tables to be defined beforehand
In [6]:
%%sqlCREATE TABLE IF NOT EXISTS atm_locations (id INT PRIMARY KEY,name VARCHAR(255),address VARCHAR(255),city VARCHAR(255),country VARCHAR(255),latitude DECIMAL(9, 6),longitude DECIMAL(9, 6));
In [7]:
%%sqlCREATE PIPELINE atmlocations ASLOAD DATA S3 's3://ocbfinalpoc1/data'CONFIG '{"region":"ap-southeast-1"}'SKIP DUPLICATE KEY ERRORSINTO TABLE atm_locations;
In [8]:
%%sqlSTART PIPELINE atmlocations
Setup CDC from MongoDB to SingleStore
Now we setup CDC from MongoDB to replicate the data SingleStore
The collections to be replicated are specified as a comma separated or in a wildcard format in "collection.include.list"
In [9]:
%%sqlCREATE LINK mongo AS MONGODBCONFIG '{"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017","collection.include.list": "bank.*","mongodb.ssl.enabled":"true","mongodb.authsource":"admin","mongodb.members.auto.discover": "true"}'CREDENTIALS '{"mongodb.user":"mongo_sample_reader","mongodb.password":"SingleStoreRocks27017"}';
In [10]:
%%sqlCREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mongo '*' FORMAT AVRO;
In [11]:
%%sqlSHOW PIPELINES
In [12]:
%%sqlSTART ALL PIPELINES
Check for records in tables
Data from MySQL
In [13]:
%%sqlSELECT COUNT(*) FROM transactions
In [14]:
%%sqlSELECT * FROM transactions WHERE transaction_type LIKE '%Deposit%' LIMIT 1;
Data from S3
In [15]:
%%sqlSELECT COUNT(*) FROM atm_locations
In [16]:
%%sqlSELECT * FROM atm_locations LIMIT 1;
Data from MongoDB
In [17]:
%%sqlSELECT _id:>JSON, _more:>JSON FROM profile LIMIT 1;
In [18]:
%%sqlSELECT _id:>JSON, _more:>JSON FROM history LIMIT 1;
Join tables from different sources using SQL queries
SQL Query 1: View Users details, their associated ATMs
In [19]:
%%sqlSELECTp._more::$full_name AS NameOfPerson,p._more::$email AS Email,a.id,a.name AS ATMName,a.city,a.countryFROMprofile p,atm_locations aWHEREp._more::$account_id = a.idLIMIT 10;
SQL Query 2: View Users details, their associated ATMs and transaction details
In [20]:
%%sqlSELECTp._more::$full_name AS NameOfPerson,p._more::$email AS Email,a.id,a.name AS ATMName,a.city,t.transaction_id,t.transaction_date,t.amount,t.transaction_type,t.descriptionFROMprofile pJOINatm_locations a ON p._more::$account_id = a.idLEFT JOINtransactions t ON p._more::$account_id = t.account_idLIMIT 10;
Run queries in Mongo Query Language using Kai
In [21]:
from pymongo import MongoClientimport pprintfrom prettytable import PrettyTableclient = MongoClient(connection_url_kai)# Get the profile collectiondb = client['BankingAnalytics']profile_coll = db['profile']for profile in profile_coll.find().limit(1):pprint.pprint(profile)
In [22]:
pipeline = [{"$lookup": {"from": "profile","localField": "account_id","foreignField": "account_id","as": "profile_data"}},{"$limit": 5},{"$group": {"_id": "$_id","history_data": {"$first": "$$ROOT"},"profile_data": {"$first": {"$arrayElemAt": ["$profile_data", 0]}}}},{"$project": {"_id": "$history_data._id","account_id": "$history_data.account_id","history_data": "$history_data","profile_data": "$profile_data"}}]# Execute the aggregation pipelineresult = list(db.history.aggregate(pipeline))# Print the result in a tabular formattable = PrettyTable(["Account ID", "Full Name", "Date of Birth", "City", "State", "Country", "Postal Code", "Phone Number", "Email"])for doc in result:profile_data = doc["profile_data"]table.add_row([doc["account_id"],profile_data.get("full_name", ""),profile_data.get("date_of_birth", ""),profile_data.get("city", ""),profile_data.get("state", ""),profile_data.get("country", ""),profile_data.get("postal_code", ""),profile_data.get("phone_number", ""),profile_data.get("email", "")])print(table)
In [23]:
# Get the state with highest number of customersfrom bson.son import SONpipeline = [{"$group": {"_id": "$state", "count": {"$sum": 1}}},{"$sort": SON([("count", -1), ("_id", -1)])},{"$limit": 5}]pprint.pprint(list(profile_coll.aggregate(pipeline)))
In [24]:
import matplotlib.pyplot as pltdata = list(profile_coll.aggregate(pipeline))print(data)country,count = [dcts['_id'] for dcts in data],[dcts['count'] for dcts in data]plt.bar(country,count)plt.plot()
With SingleStore Kai you can power analytics on SQL and NoSQL data using the API of your choice
Details
About this Template
Perform both SQL and NoSQL queries on multi-modal relational and JSON data
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.