New

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

Notebook


SingleStore Notebooks

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

Unified Data Analysis: SQL & NoSQL on a Single Database with Kai

What you will learn in this notebook:

In this notebook we ingest data from from different sources like MySQL, MongoDB and S3 and perform efficient analysis using both NoSQL and SQL on multimodal data (tabular and JSON).

Highlights

  1. Setup CDC from MongoDB and MySQL in easy steps. Replicate data in real-time and ensure upto date information for analytics, eliminating the need for complex tooling for data movement

  2. Analyze data using both NoSQL and relational approaches, depending on your specific needs. Developers and data analytics who are familiar with different programming approaches like MongoDB query language and SQL can work together on the same database. Perform familiar SQL queries on your NoSQL data!

Ready to unlock real-time analytics and unified data access? Let's start!

In [1]:

1

!pip install pymongo prettytable matplotlib --quiet

Create database for importing data from different sources

This example gets banking data from three different sources: ATM locations from S3, transaction data from MySQL and user profile details from MongoDB databases. Joins data from different sources to generate rich insights about the transactional activity across user profile and locations across the globe

In [2]:

1

%%sql

2

DROP DATABASE IF EXISTS BankingAnalytics;

3

CREATE DATABASE BankingAnalytics;

Action Required

Make sure to select 'BankingAnalytics' database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.

Setup CDC from MySQL

SingleStore allows you to ingest the data from mysql using pipelines

In this step, we create a link from MySQL instance and start the pipelines for the CDC

In [3]:

1

%%sql

2

CREATE LINK mysqllink AS MYSQL

3

CONFIG '{

4

"database.hostname": "3.132.226.181",

5

"database.exclude.list": "mysql,performance_schema",

6

"table.include.list": "DomainAnalytics.transactions",

7

"database.port": 3306,

8

"database.ssl.mode":"required"

9

}'

10

CREDENTIALS '{

11

"database.password": "Password@123",

12

"database.user": "repl_user"

13

}';

In [4]:

1

%%sql

2

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mysqllink "*" FORMAT AVRO;

In [5]:

1

%%sql

2

START ALL PIPELINES;

Migrate the data from S3 storage to SingleStore using Pipelines

This steps loads data from S3, this requires the tables to be defined beforehand

In [6]:

1

%%sql

2

CREATE TABLE IF NOT EXISTS atm_locations (

3

id INT PRIMARY KEY,

4

name VARCHAR(255),

5

address VARCHAR(255),

6

city VARCHAR(255),

7

country VARCHAR(255),

8

latitude DECIMAL(9, 6),

9

longitude DECIMAL(9, 6)

10

);

In [7]:

1

%%sql

2

CREATE PIPELINE atmlocations AS

3

LOAD DATA S3 's3://ocbfinalpoc1/data'

4

CONFIG '{"region":"ap-southeast-1"}'

5

SKIP DUPLICATE KEY ERRORS

6

INTO TABLE atm_locations;

In [8]:

1

%%sql

2

START PIPELINE atmlocations

Setup CDC from MongoDB to SingleStore

Now we setup CDC from MongoDB to replicate the data SingleStore

The collections to be replicated are specified as a comma separated or in a wildcard format in "collection.include.list"

In [9]:

1

%%sql

2

CREATE LINK mongo AS MONGODB

3

CONFIG '{

4

"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017",

5

"collection.include.list": "bank.*",

6

"mongodb.ssl.enabled":"true",

7

"mongodb.authsource":"admin",

8

"mongodb.members.auto.discover": "true"

9

}'

10

CREDENTIALS '{

11

"mongodb.user":"mongo_sample_reader",

12

"mongodb.password":"SingleStoreRocks27017"

13

}';

In [10]:

1

%%sql

2

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mongo '*' FORMAT AVRO;

In [11]:

1

%%sql

2

SHOW PIPELINES

In [12]:

1

%%sql

2

START ALL PIPELINES

Check for records in tables

Data from MySQL

In [13]:

1

%%sql

2

SELECT COUNT(*) FROM transactions

In [14]:

1

%%sql

2

SELECT * FROM transactions WHERE transaction_type LIKE '%Deposit%' LIMIT 1;

Data from S3

In [15]:

1

%%sql

2

SELECT COUNT(*) FROM atm_locations

In [16]:

1

%%sql

2

SELECT * FROM atm_locations LIMIT 1;

Data from MongoDB

In [17]:

1

%%sql

2

SELECT _id:>JSON, _more:>JSON FROM profile LIMIT 1;

In [18]:

1

%%sql

2

3

SELECT _id:>JSON, _more:>JSON FROM history LIMIT 1;

Join tables from different sources using SQL queries

SQL Query 1: View Users details, their associated ATMs

In [19]:

1

%%sql

2

SELECT

3

p._more::$full_name AS NameOfPerson,

4

p._more::$email AS Email,

5

a.id,

6

a.name AS ATMName,

7

a.city,

8

a.country

9

FROM

10

profile p,

11

atm_locations a

12

WHERE

13

p._more::$account_id = a.id

14

LIMIT 10;

SQL Query 2: View Users details, their associated ATMs and transaction details

In [20]:

1

%%sql

2

SELECT

3

p._more::$full_name AS NameOfPerson,

4

p._more::$email AS Email,

5

a.id,

6

a.name AS ATMName,

7

a.city,

8

t.transaction_id,

9

t.transaction_date,

10

t.amount,

11

t.transaction_type,

12

t.description

13

FROM

14

profile p

15

JOIN

16

atm_locations a ON p._more::$account_id = a.id

17

LEFT JOIN

18

transactions t ON p._more::$account_id = t.account_id

19

LIMIT 10;

Run queries in Mongo Query Language using Kai

In [21]:

1

from pymongo import MongoClient

2

import pprint

3

from prettytable import PrettyTable

4

5

client = MongoClient(connection_url_kai)

6

7

# Get the profile collection

8

db = client['BankingAnalytics']

9

profile_coll = db['profile']

10

11

for profile in profile_coll.find().limit(1):

12

pprint.pprint(profile)

In [22]:

1

pipeline = [

2

{

3

"$lookup": {

4

"from": "profile",

5

"localField": "account_id",

6

"foreignField": "account_id",

7

"as": "profile_data"

8

}

9

},

10

{

11

"$limit": 5

12

},

13

{

14

"$group": {

15

"_id": "$_id",

16

"history_data": {"$first": "$$ROOT"},

17

"profile_data": {"$first": {"$arrayElemAt": ["$profile_data", 0]}}

18

}

19

},

20

{

21

"$project": {

22

"_id": "$history_data._id",

23

"account_id": "$history_data.account_id",

24

"history_data": "$history_data",

25

"profile_data": "$profile_data"

26

}

27

}

28

]

29

30

# Execute the aggregation pipeline

31

result = list(db.history.aggregate(pipeline))

32

33

# Print the result in a tabular format

34

table = PrettyTable(["Account ID", "Full Name", "Date of Birth", "City", "State", "Country", "Postal Code", "Phone Number", "Email"])

35

for doc in result:

36

profile_data = doc["profile_data"]

37

table.add_row([

38

doc["account_id"],

39

profile_data.get("full_name", ""),

40

profile_data.get("date_of_birth", ""),

41

profile_data.get("city", ""),

42

profile_data.get("state", ""),

43

profile_data.get("country", ""),

44

profile_data.get("postal_code", ""),

45

profile_data.get("phone_number", ""),

46

profile_data.get("email", "")

47

])

48

49

print(table)

In [23]:

1

# Get the state with highest number of customers

2

from bson.son import SON

3

4

pipeline = [

5

{"$group": {"_id": "$state", "count": {"$sum": 1}}},

6

{"$sort": SON([("count", -1), ("_id", -1)])},

7

{"$limit": 5}

8

]

9

10

pprint.pprint(list(profile_coll.aggregate(pipeline)))

In [24]:

1

import matplotlib.pyplot as plt

2

3

data = list(profile_coll.aggregate(pipeline))

4

5

print(data)

6

7

country,count = [dcts['_id'] for dcts in data],[dcts['count'] for dcts in data]

8

9

plt.bar(country,count)

10

plt.plot()

With SingleStore Kai you can power analytics on SQL and NoSQL data using the API of your choice

Details


About this Template

Perform both SQL and NoSQL queries on multi-modal relational and JSON data

Notebook Icon

This Notebook can be run in Standard and Enterprise deployments.

Tags

cdcmongosqlnosqlkai

License

This Notebook has been released under the Apache 2.0 open source license.

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.