
Real-Time Anomaly Detection
Notebook

In this notebook, we embark on a cutting-edge exploration of real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques. Our journey begins with the efficient ingestion of sensor data into SingleStoreDB, setting the stage for dynamic and insightful analysis. The heart of this notebook lies in its innovative approach to handling and interpreting sensor data. We utilize the power of vector embeddings, generated through the UMAP library, to transform high-dimensional sensor readings into a format ripe for anomaly detection. These embeddings, capturing the essence of weather parameters like wind, rain, and temperature, are then seamlessly integrated into SingleStoreDB.
Our focus intensifies as we apply SingleStoreDB's dot_product function to these embeddings, unearthing anomalies in real-time. This not only provides a swift identification of irregularities but also paints a vivid picture of sensor data behavior over time. We don’t just stop at detection; the notebook further enriches the data analysis with a visually engaging, real-time dashboard. This dashboard, crafted using Plotly and Rich libraries, offers an interactive and constantly updated view of the anomalies, allowing users to monitor and respond to sensor data trends as they happen. Join us in this exciting venture as we blend SQL, SingleStoreDB, and Python to unlock new possibilities in real-time anomaly detection. Whether you're a data scientist, an IoT enthusiast, or simply intrigued by the power of real-time analytics, this notebook is your gateway to understanding and leveraging the full potential of IoT sensor data.

Database Setup
Overview
This section focuses on the initial setup of the database iot_sensor_db
in SingleStore, specifically designed for handling IoT sensor data. It includes creating the necessary tables to store both historical and real-time sensor data, along with vector embeddings for anomaly detection.
SQL Script Description
The provided SQL script performs the following operations:
Database Initialization
DROP DATABASE IF EXISTS iot_sensor_db;
Ensures a clean slate by dropping theiot_sensor_db
database if it already exists.CREATE DATABASE iot_sensor_db;
Creates a new database namediot_sensor_db
.USE iot_sensor_db;
Setsiot_sensor_db
as the current database for subsequent operations.
Table Creation
CREATE TABLE sensor_data_with_vectors
This table is designed to store processed sensor data along with vector embeddings for anomaly detection.Columns:
date
: Timestamp of the sensor data.city
,longitude
,latitude
: Location information of the sensor.vent
,pluie
,temp
: Sensor readings for wind (vent), rain (pluie), and temperature (temp).anomaly
: Flag indicating whether the data point is an anomaly.embeddings
: Text column for storing vector embeddings.
CREATE TABLE sensor_data_stage
Serves as a staging area for raw sensor data before processing.Columns: Similar to
sensor_data_with_vectors
, but used for staging raw data.
In [1]:
1
%%sql2
DROP DATABASE IF EXISTS iot_sensor_db;3
4
CREATE DATABASE iot_sensor_db;5
6
USE iot_sensor_db;7
8
CREATE TABLE sensor_data_with_vectors (9
date DATETIME,10
city VARCHAR(50),11
longitude VARCHAR(50),12
latitude VARCHAR(50),13
vent FLOAT(8,2),14
pluie FLOAT(8,2),15
temp FLOAT(8,2),16
anomaly VARCHAR(10),17
embeddings text18
);19
20
CREATE TABLE sensor_data_stage (21
city VARCHAR(50),22
longitude VARCHAR(50),23
latitude VARCHAR(50),24
vent FLOAT(8,2),25
pluie FLOAT(8,2),26
temp FLOAT(8,2),27
embeddings text,28
date DATETIME29
);
Setting Up and Initiating the Sensor Data Pipeline
Overview
This section details the setup and initiation of two pipelines in SingleStore: sensor_data_pipeline
for historical data load and sensor_realtime_data_pipeline
for real-time data analysis. Both pipelines stream and ingest IoT sensor data from S3 buckets into respective tables in SingleStore.
SQL Script Description
Historical Data Load Pipeline
Pipeline Creation
CREATE OR REPLACE PIPELINE sensor_data_pipeline AS
Creates or replaces a pipeline namedsensor_data_pipeline
.Configuration:
Source: S3 bucket path
s3://real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv
.Target: Table
sensor_data_with_vectors
.Data Format: CSV with specific delimiters and header line ignored.
Pipeline Activation
START PIPELINE sensor_data_pipeline FOREGROUND;
Initiates the pipeline for data ingestion.
Data Verification
SELECT * FROM sensor_data_with_vectors limit 2;
Fetches the first two rows fromsensor_data_with_vectors
to verify data ingestion.
Real-Time Data Analysis Pipeline
Pipeline Creation
CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS
Establishes a new pipeline namedsensor_realtime_data_pipeline
.Configuration:
Source: S3 bucket path
s3://real-time-anomaly-detection-demo/demothon/demo_day_data2.csv
.Target: Table
sensor_data_stage
.Data Format: CSV with specific delimiters and header line ignored.
Additional Setting:
SET date = NOW();
assigns the current timestamp to thedate
column.
Pipeline Activation
START PIPELINE sensor_realtime_data_pipeline FOREGROUND;
Activates the pipeline for real-time data ingestion.
Data Verification
SELECT * FROM sensor_data_stage limit 1;
Retrieves the first row fromsensor_data_stage
to confirm data ingestion.
Usage
The establishment of these pipelines is essential for the real-time and historical analysis of IoT sensor data. sensor_data_pipeline
facilitates the ingestion of historical data for retrospective analysis, while sensor_realtime_data_pipeline
caters to ongoing, real-time data analysis needs.
In [2]:
1
%%sql2
3
CREATE OR REPLACE PIPELINE sensor_data_pipeline AS4
LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv'5
INTO TABLE `sensor_data_with_vectors`6
FIELDS TERMINATED BY ','7
ENCLOSED BY '"'8
LINES TERMINATED BY '\n'9
IGNORE 1 LINES;
In [3]:
1
%%sql2
START PIPELINE sensor_data_pipeline FOREGROUND;
In [4]:
1
%%sql2
SELECT * FROM sensor_data_with_vectors limit 2;
In [5]:
1
%%sql2
CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS3
LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/demo_day_data2.csv'4
INTO TABLE `sensor_data_stage`5
(city, longitude, latitude, vent, pluie, temp, embeddings)6
FIELDS TERMINATED BY ','7
ENCLOSED BY '"'8
LINES TERMINATED BY '\r\n'9
IGNORE 1 LINES10
SET date = NOW();
In [6]:
1
%%sql2
START PIPELINE sensor_realtime_data_pipeline FOREGROUND;
In [7]:
1
%%sql2
SELECT * FROM sensor_data_stage limit 1;
Data Preparation for Analysis
Overview
This section covers the necessary steps to prepare IoT sensor data for analysis. It involves installing a required library, data retrieval from the sensor_data_stage
table, and preprocessing to ensure data quality.
Python Script Description
Library Installation
!pip install umap-learn --quiet
Installs theumap-learn
library quietly without verbose output. UMAP (Uniform Manifold Approximation and Projection) is used for dimensionality reduction.
Import Statements
Note: It's advised to restart the Python Kernel before importing
umap
to ensure the library is properly loaded.Imports various libraries including
umap
,normalize
fromsklearn.preprocessing
,sqlalchemy
,create_engine
,json
, andpandas
.
Database Connection
engine = create_engine(connection_url)
Establishes a connection to the database usingconnection_url
.
Data Retrieval and Preprocessing
df = pd.read_sql('select * from sensor_data_stage', engine)
Retrieves data from thesensor_data_stage
table into a pandas DataFrame.df = df.bfill(axis=0)
Fills null values in the DataFrame by propagating non-null values backward.df = df.dropna()
Drops any remaining rows with null values to ensure the dataset is clean for analysis.
In [8]:
1
!pip install umap-learn --quiet
Note
Restart Kernel if importing umap gives error
In [9]:
1
import umap2
from sklearn.preprocessing import normalize3
4
import sqlalchemy5
from sqlalchemy import create_engine6
import json7
import pandas as pd
In [10]:
1
# Filling null values usingbfill()2
3
engine = create_engine(connection_url)4
5
df = pd.read_sql('select * from sensor_data_stage', engine)6
7
df = df.bfill(axis=0)8
9
df = df.dropna()
Generating Vector Embeddings using UMAP Library
Overview
This section focuses on creating vector embeddings from sensor data using the UMAP library. The embeddings are generated to reduce the dimensionality of the data while preserving its structure, aiding in efficient analysis.
Python Script Description
Data Selection for Embedding Generation
new_df1 = df.iloc[50:100]
Creates a subset of the DataFramedf
, selecting rows 50 to 100. This subset is used for generating vector embeddings.
Feature Selection
features = new_df1[['vent', 'pluie', 'temp']]
Selects the columnsvent
,pluie
, andtemp
fromnew_df1
as features for the embedding process. These represent sensor readings for wind, rain, and temperature.
UMAP Reducer Initialization and Transformation
reducer = umap.UMAP(n_components=15)
Initializes a UMAP reducer to reduce the feature space to 15 components.embeddings = reducer.fit_transform(features)
Applies the UMAP transformation to the selected features, generating low-dimensional embeddings from the high-dimensional sensor data.
Normalization and Embedding Storage
normalized_embeddings = normalize(embeddings, norm='l2')
Normalizes the generated embeddings using L2 norm, ensuring uniform scale.new_df1['embeddings'] = list(normalized_embeddings)
Appends the normalized embeddings as a new column tonew_df1
.
Displaying Results
new_df1.head()
Displays the first few rows ofnew_df1
to verify the embedding generation and integration process.
In [11]:
1
# code to generate embeddings for real time data2
new_df1 = df.iloc[50:100]3
features = new_df1[['vent', 'pluie', 'temp']]4
5
reducer = umap.UMAP(n_components=15)6
embeddings = reducer.fit_transform(features)
In [12]:
1
normalized_embeddings = normalize(embeddings, norm='l2')2
new_df1['embeddings'] = list(normalized_embeddings)
In [13]:
1
new_df1.head()
Anomaly Detection and Data Integration
Anomaly Detection Using SingleStore dot_product Function
Anomaly Detection Loop:
Iterates over each row in
new_df
.Extracts embeddings and converts them into JSON format.
Constructs an SQL query using SingleStore's
dot_product
function to measure similarity between the current row's embeddings and those in thesensor_data_with_vectors
table.The query fetches the
anomaly
status based on the highest similarity scores.SQL query execution:
result = pd.read_sql_query(query, con=engine)
.Anomalies are appended to
new_df
or set to a default value if no similar records are found.
Data Appending to Historical Table
Data Type Casting:
Ensures appropriate data types for columns in
new_df
(e.g., convertingdate
to datetime,city
,longitude
,latitude
to strings, etc.).
Appending to SQL Table:
new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)
appends the processed data innew_df
to thesensor_data_with_vectors
table in the database.
In [14]:
1
new_df = df.iloc[50:70].copy()2
# iterate over each row in the new DataFrame3
for index, row in new_df.iterrows():4
# get the embeddings from the current row5
embeddings = row['embeddings']6
7
# convert numpy array to list and then to a JSON string8
embeddings_json = json.loads(embeddings)9
10
# create the query string11
query = f"""12
SELECT anomaly, COUNT(anomaly) as count13
FROM (14
SELECT anomaly, dot_product(15
JSON_ARRAY_PACK('{embeddings_json}'),16
JSON_ARRAY_PACK(sensor_data_with_vectors.embeddings)17
) AS similarity18
FROM sensor_data_with_vectors19
ORDER BY similarity DESC20
LIMIT 2021
) AS subquery22
GROUP BY anomaly23
ORDER BY count DESC;24
"""25
26
# execute the query27
result = pd.read_sql_query(query, con=engine)28
29
# check if the result is empty30
if not result.empty:31
# append the result to the current row in the new DataFrame32
new_df.loc[index, 'anomaly'] = result['anomaly'].values[0]33
else:34
# set anomaly to None or some default value35
new_df.loc[index, 'anomaly'] = 'none'
In [15]:
1
new_df.head()
In [16]:
1
# appending the new dataframe to main the table : sensor_data_with_vectors2
new_df['date'] = pd.to_datetime(new_df['date'])3
new_df['city'] = new_df['city'].astype(str)4
new_df['longitude'] = new_df['longitude'].astype(str)5
new_df['latitude'] = new_df['latitude'].astype(str)6
new_df['vent'] = new_df['vent'].astype(float)7
new_df['pluie'] = new_df['pluie'].astype(float)8
new_df['temp'] = new_df['temp'].astype(float)9
new_df['anomaly'] = new_df['anomaly'].astype(str)10
new_df['embeddings'] = new_df['embeddings'].astype(str)11
12
# Append data to SQL table13
new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)
Dashboard for Monitoring Anomalies over Time
Data Visualization Setup
Library Imports:
pandas
,plotly.express
, andsqlalchemy
.Database Connection: Establishes connection to the database using
create_engine(connection_url)
.
Data Retrieval and Processing
SQL Query for Data Fetching: Retrieves anomaly data from
sensor_data_with_vectors
table, excluding entries with 'none' in the anomaly field.Data Preparation:
Converts
date
column to datetime format and extracts date part into a new columndate_only
.Groups data by
date_only
andanomaly
, counting occurrences to prepare for visualization.
Plotting Anomalies over Time
Overall Anomaly Trends:
Utilizes
plotly.express
to create a histogram representing anomaly counts over time.Each anomaly type is color-coded for distinction.
City-Specific Anomaly Trends:
Further groups data by
city
along withdate_only
andanomaly
.Loops through a predefined list of cities to create separate histograms for each city, showcasing city-specific anomaly trends.
In [17]:
1
import pandas as pd2
import plotly.express as px3
from sqlalchemy import create_engine4
engine = create_engine(connection_url)
In [18]:
1
# df = pd.read_sql('select * from sensor_data_with_vectors limit 50000;', engine)2
df = pd.read_sql("select * from sensor_data_with_vectors where anomaly <> 'none' limit 50000;", engine)
In [19]:
1
df['date'] = pd.to_datetime(df['date'])2
df['date_only'] = df['date'].dt.date3
# Group data by date and anomaly, then count the instances4
grouped_df = df.groupby(['date_only', 'anomaly']).size().reset_index(name='counts')5
6
# Create line plot with Plotly7
fig = px.histogram(grouped_df, x='date_only', y='counts', color='anomaly',8
title='Anomalies over Time', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})9
10
# Show plot11
fig.show()
In [20]:
1
# Group data by date, city and anomaly, then count the instances2
grouped_df = df.groupby(['date_only', 'city', 'anomaly']).size().reset_index(name='counts')3
4
# List your cities5
cities = ['Washington DC', 'New York', 'Los Angeles'] # Add or change according to your DataFrame6
7
# Create a separate plot for each city8
for city in cities:9
city_df = grouped_df[grouped_df['city'] == city]10
fig = px.histogram(city_df, x='date_only', y='counts', color='anomaly',11
title=f'Anomalies over Time for {city}', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})12
fig.show()
Real-Time Anomaly Detection Dashboard
Environment Setup and Library Imports
Library Installation: Installs
tabulate
,pymysql
,Ipython
, andrich
libraries.Imports: Includes libraries such as
time
,os
,shutil
,pymysql
,rich.console
,rich.table
,IPython.display
,sqlalchemy
, andpandas
.
Dashboard Function Definition
Function
display_table_contents
:Establishes a database connection using
create_engine(connection_url)
.Executes an SQL query to fetch initial data from
sensor_data_with_vectors
with specific columns (date
,vent
,pluie
,temp
,anomaly
).Enters a loop to continuously update and display the dashboard.
Dashboard Display Mechanics
Console and Table Setup:
Clears the console output and creates a console instance with
rich.console
.Determines the terminal width and sets up a dynamic table layout.
Adds columns with specific styles and alignments for better readability.
Data Display and Refresh Loop:
Adds the top 50 rows from the fetched data to the table.
Styles rows based on the anomaly type (e.g., different colors for different anomaly types).
Refreshes the display every 10 seconds, fetching updated data from the database.
In [21]:
1
!pip install tabulate pymysql Ipython rich --quiet
In [22]:
1
import time2
import os3
import shutil4
import pymysql5
from rich.console import Console6
from rich.table import Table7
from rich import box8
from IPython.display import clear_output
In [23]:
1
from sqlalchemy import create_engine2
import pandas as pd3
4
def display_table_contents():5
# Create a database engine6
engine = create_engine(connection_url)7
8
# Execute query to fetch initial table contents9
query = 'SELECT date, vent, pluie, temp, anomaly FROM sensor_data_with_vectors ORDER BY date DESC'10
table_data = pd.read_sql_query(query, engine)11
12
while True:13
# Clear console output14
clear_output(wait=True)15
16
# Create a console instance17
console = Console()18
19
# Get the terminal width20
terminal_width = shutil.get_terminal_size().columns21
22
# Print the title with centered alignment23
title = "[bold magenta]REAL TIME ANALYTICS DASHBOARD[/bold magenta]"24
console.print(title.center(terminal_width))25
26
# Create a table instance27
table = Table(show_header=True, header_style="bold", box=None)28
29
# Add columns to the table30
table.add_column("Date", justify="center", style="cyan", width=terminal_width // 5 + 5)31
table.add_column("Vent", justify="center", style="magenta", width=terminal_width // 5)32
table.add_column("Pluie", justify="center", style="yellow", width=terminal_width // 5)33
table.add_column("Temp", justify="center", style="green", width=terminal_width // 5)34
table.add_column("Anomaly", justify="center", width=terminal_width // 5)35
36
# Add rows to the table37
for row in table_data.head(50).itertuples(index=False):38
# Convert datetime to string before adding to the table39
formatted_row = [str(cell) for cell in row]40
41
# Check the anomaly type42
anomaly_type = formatted_row[4]43
44
# Determine the style based on the anomaly type45
if anomaly_type == 'pluie':46
style = "bold blue"47
elif anomaly_type == 'vent':48
style = "bold magenta"49
elif anomaly_type == 'temp':50
style = "bold green"51
else:52
style = ""53
54
# Add the row with the appropriate style55
table.add_row(*formatted_row, style=style)56
57
# Print the table58
console.print(table)59
60
# Wait for 30 seconds before refreshing61
time.sleep(10)62
63
# Execute query to fetch updated table contents64
updated_data = pd.read_sql_query(query, engine)65
66
# Update the table_data with the fetched data67
table_data = updated_data68
69
# Call the function to start displaying the table contents70
display_table_contents()

Details
About this Template
Real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.