New

Real-Time Anomaly Detection

Notebook


SingleStore Notebooks

Real-Time Anomaly Detection

In this notebook, we embark on a cutting-edge exploration of real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques. Our journey begins with the efficient ingestion of sensor data into SingleStoreDB, setting the stage for dynamic and insightful analysis. The heart of this notebook lies in its innovative approach to handling and interpreting sensor data. We utilize the power of vector embeddings, generated through the UMAP library, to transform high-dimensional sensor readings into a format ripe for anomaly detection. These embeddings, capturing the essence of weather parameters like wind, rain, and temperature, are then seamlessly integrated into SingleStoreDB.

Our focus intensifies as we apply SingleStoreDB's dot_product function to these embeddings, unearthing anomalies in real-time. This not only provides a swift identification of irregularities but also paints a vivid picture of sensor data behavior over time. We don’t just stop at detection; the notebook further enriches the data analysis with a visually engaging, real-time dashboard. This dashboard, crafted using Plotly and Rich libraries, offers an interactive and constantly updated view of the anomalies, allowing users to monitor and respond to sensor data trends as they happen. Join us in this exciting venture as we blend SQL, SingleStoreDB, and Python to unlock new possibilities in real-time anomaly detection. Whether you're a data scientist, an IoT enthusiast, or simply intrigued by the power of real-time analytics, this notebook is your gateway to understanding and leveraging the full potential of IoT sensor data.

Architecture Diagram

Database Setup

Overview

This section focuses on the initial setup of the database iot_sensor_db in SingleStore, specifically designed for handling IoT sensor data. It includes creating the necessary tables to store both historical and real-time sensor data, along with vector embeddings for anomaly detection.

SQL Script Description

The provided SQL script performs the following operations:

  1. Database Initialization

    • DROP DATABASE IF EXISTS iot_sensor_db; Ensures a clean slate by dropping the iot_sensor_db database if it already exists.

    • CREATE DATABASE iot_sensor_db; Creates a new database named iot_sensor_db.

    • USE iot_sensor_db; Sets iot_sensor_db as the current database for subsequent operations.

  2. Table Creation

    • CREATE TABLE sensor_data_with_vectors This table is designed to store processed sensor data along with vector embeddings for anomaly detection.

      • Columns:

        • date: Timestamp of the sensor data.

        • city, longitude, latitude: Location information of the sensor.

        • vent, pluie, temp: Sensor readings for wind (vent), rain (pluie), and temperature (temp).

        • anomaly: Flag indicating whether the data point is an anomaly.

        • embeddings: Text column for storing vector embeddings.

    • CREATE TABLE sensor_data_stage Serves as a staging area for raw sensor data before processing.

      • Columns: Similar to sensor_data_with_vectors, but used for staging raw data.

In [1]:

1%%sql2DROP DATABASE IF EXISTS iot_sensor_db;3
4CREATE DATABASE iot_sensor_db;5
6USE iot_sensor_db;7
8CREATE TABLE sensor_data_with_vectors (9  date DATETIME,10    city VARCHAR(50),11    longitude VARCHAR(50),12    latitude VARCHAR(50),13  vent FLOAT(8,2),14  pluie FLOAT(8,2),15  temp FLOAT(8,2),16  anomaly VARCHAR(10),17  embeddings text18);19
20CREATE TABLE sensor_data_stage (21    city VARCHAR(50),22    longitude VARCHAR(50),23    latitude VARCHAR(50),24    vent FLOAT(8,2),25    pluie FLOAT(8,2),26    temp FLOAT(8,2),27    embeddings text,28    date DATETIME29);

Setting Up and Initiating the Sensor Data Pipeline

Overview

This section details the setup and initiation of two pipelines in SingleStore: sensor_data_pipeline for historical data load and sensor_realtime_data_pipeline for real-time data analysis. Both pipelines stream and ingest IoT sensor data from S3 buckets into respective tables in SingleStore.

SQL Script Description

Historical Data Load Pipeline
  1. Pipeline Creation

    • CREATE OR REPLACE PIPELINE sensor_data_pipeline AS Creates or replaces a pipeline named sensor_data_pipeline.

    • Configuration:

      • Source: S3 bucket path s3://real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv.

      • Target: Table sensor_data_with_vectors.

      • Data Format: CSV with specific delimiters and header line ignored.

  2. Pipeline Activation

    • START PIPELINE sensor_data_pipeline FOREGROUND; Initiates the pipeline for data ingestion.

  3. Data Verification

    • SELECT * FROM sensor_data_with_vectors limit 2; Fetches the first two rows from sensor_data_with_vectors to verify data ingestion.

Real-Time Data Analysis Pipeline
  1. Pipeline Creation

    • CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS Establishes a new pipeline named sensor_realtime_data_pipeline.

    • Configuration:

      • Source: S3 bucket path s3://real-time-anomaly-detection-demo/demothon/demo_day_data2.csv.

      • Target: Table sensor_data_stage.

      • Data Format: CSV with specific delimiters and header line ignored.

      • Additional Setting: SET date = NOW(); assigns the current timestamp to the date column.

  2. Pipeline Activation

    • START PIPELINE sensor_realtime_data_pipeline FOREGROUND; Activates the pipeline for real-time data ingestion.

  3. Data Verification

    • SELECT * FROM sensor_data_stage limit 1; Retrieves the first row from sensor_data_stage to confirm data ingestion.

Usage

The establishment of these pipelines is essential for the real-time and historical analysis of IoT sensor data. sensor_data_pipeline facilitates the ingestion of historical data for retrospective analysis, while sensor_realtime_data_pipeline caters to ongoing, real-time data analysis needs.

In [2]:

1%%sql2
3CREATE OR REPLACE PIPELINE sensor_data_pipeline AS4LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv'5INTO TABLE `sensor_data_with_vectors`6FIELDS TERMINATED BY ','7ENCLOSED BY '"'8LINES TERMINATED BY '\n'9IGNORE 1 LINES;

In [3]:

1%%sql2START PIPELINE sensor_data_pipeline FOREGROUND;

In [4]:

1%%sql2SELECT * FROM sensor_data_with_vectors limit 2;

In [5]:

1%%sql2CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS3LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/demo_day_data2.csv'4INTO TABLE `sensor_data_stage`5(city, longitude, latitude, vent, pluie, temp, embeddings)6FIELDS TERMINATED BY ','7ENCLOSED BY '"'8LINES TERMINATED BY '\r\n'9IGNORE 1 LINES10SET date = NOW();

In [6]:

1%%sql2START PIPELINE sensor_realtime_data_pipeline FOREGROUND;

In [7]:

1%%sql2SELECT * FROM sensor_data_stage limit 1;

Data Preparation for Analysis

Overview

This section covers the necessary steps to prepare IoT sensor data for analysis. It involves installing a required library, data retrieval from the sensor_data_stage table, and preprocessing to ensure data quality.

Python Script Description

  1. Library Installation

    • !pip install umap-learn --quiet Installs the umap-learn library quietly without verbose output. UMAP (Uniform Manifold Approximation and Projection) is used for dimensionality reduction.

  2. Import Statements

    • Note: It's advised to restart the Python Kernel before importing umap to ensure the library is properly loaded.

    • Imports various libraries including umap, normalize from sklearn.preprocessing, sqlalchemy, create_engine, json, and pandas.

  3. Database Connection

    • engine = create_engine(connection_url) Establishes a connection to the database using connection_url.

  4. Data Retrieval and Preprocessing

    • df = pd.read_sql('select * from sensor_data_stage', engine) Retrieves data from the sensor_data_stage table into a pandas DataFrame.

    • df = df.bfill(axis=0) Fills null values in the DataFrame by propagating non-null values backward.

    • df = df.dropna() Drops any remaining rows with null values to ensure the dataset is clean for analysis.

In [8]:

1!pip install umap-learn --quiet

Note

Restart Kernel if importing umap gives error

In [9]:

1import umap2from sklearn.preprocessing import normalize3
4import sqlalchemy5from sqlalchemy import create_engine6import json7import pandas as pd

In [10]:

1# Filling null values usingbfill()2
3engine = create_engine(connection_url)4
5df = pd.read_sql('select * from sensor_data_stage', engine)6
7df = df.bfill(axis=0)8
9df = df.dropna()

Generating Vector Embeddings using UMAP Library

Overview

This section focuses on creating vector embeddings from sensor data using the UMAP library. The embeddings are generated to reduce the dimensionality of the data while preserving its structure, aiding in efficient analysis.

Python Script Description

  1. Data Selection for Embedding Generation

    • new_df1 = df.iloc[50:100] Creates a subset of the DataFrame df, selecting rows 50 to 100. This subset is used for generating vector embeddings.

  2. Feature Selection

    • features = new_df1[['vent', 'pluie', 'temp']] Selects the columns vent, pluie, and temp from new_df1 as features for the embedding process. These represent sensor readings for wind, rain, and temperature.

  3. UMAP Reducer Initialization and Transformation

    • reducer = umap.UMAP(n_components=15) Initializes a UMAP reducer to reduce the feature space to 15 components.

    • embeddings = reducer.fit_transform(features) Applies the UMAP transformation to the selected features, generating low-dimensional embeddings from the high-dimensional sensor data.

  4. Normalization and Embedding Storage

    • normalized_embeddings = normalize(embeddings, norm='l2') Normalizes the generated embeddings using L2 norm, ensuring uniform scale.

    • new_df1['embeddings'] = list(normalized_embeddings) Appends the normalized embeddings as a new column to new_df1.

  5. Displaying Results

    • new_df1.head() Displays the first few rows of new_df1 to verify the embedding generation and integration process.

In [11]:

1# code to generate embeddings for real time data2new_df1 = df.iloc[50:100]3features = new_df1[['vent', 'pluie', 'temp']]4
5reducer = umap.UMAP(n_components=15)6embeddings = reducer.fit_transform(features)

In [12]:

1normalized_embeddings = normalize(embeddings, norm='l2')2new_df1['embeddings'] = list(normalized_embeddings)

In [13]:

1new_df1.head()

Anomaly Detection and Data Integration

Anomaly Detection Using SingleStore dot_product Function

  • Anomaly Detection Loop:

    • Iterates over each row in new_df.

    • Extracts embeddings and converts them into JSON format.

    • Constructs an SQL query using SingleStore's dot_product function to measure similarity between the current row's embeddings and those in the sensor_data_with_vectors table.

    • The query fetches the anomaly status based on the highest similarity scores.

    • SQL query execution: result = pd.read_sql_query(query, con=engine).

    • Anomalies are appended to new_df or set to a default value if no similar records are found.

Data Appending to Historical Table

  • Data Type Casting:

    • Ensures appropriate data types for columns in new_df (e.g., converting date to datetime, city, longitude, latitude to strings, etc.).

  • Appending to SQL Table:

    • new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False) appends the processed data in new_df to the sensor_data_with_vectors table in the database.

In [14]:

1new_df = df.iloc[50:70].copy()2# iterate over each row in the new DataFrame3for index, row in new_df.iterrows():4    # get the embeddings from the current row5    embeddings = row['embeddings']6
7    # convert numpy array to list and then to a JSON string8    embeddings_json = json.loads(embeddings)9
10    # create the query string11    query = f"""12           SELECT anomaly, COUNT(anomaly) as count13            FROM (14                SELECT anomaly, dot_product(15                    JSON_ARRAY_PACK('{embeddings_json}'),16                    JSON_ARRAY_PACK(sensor_data_with_vectors.embeddings)17                ) AS similarity18                FROM sensor_data_with_vectors19                ORDER BY similarity DESC20                LIMIT 2021            ) AS subquery22            GROUP BY anomaly23            ORDER BY count DESC;24    """25
26    # execute the query27    result = pd.read_sql_query(query, con=engine)28
29    # check if the result is empty30    if not result.empty:31        # append the result to the current row in the new DataFrame32        new_df.loc[index, 'anomaly'] = result['anomaly'].values[0]33    else:34        # set anomaly to None or some default value35        new_df.loc[index, 'anomaly'] = 'none'

In [15]:

1new_df.head()

In [16]:

1# appending the new dataframe to main the table : sensor_data_with_vectors2new_df['date'] = pd.to_datetime(new_df['date'])3new_df['city'] = new_df['city'].astype(str)4new_df['longitude'] = new_df['longitude'].astype(str)5new_df['latitude'] = new_df['latitude'].astype(str)6new_df['vent'] = new_df['vent'].astype(float)7new_df['pluie'] = new_df['pluie'].astype(float)8new_df['temp'] = new_df['temp'].astype(float)9new_df['anomaly'] = new_df['anomaly'].astype(str)10new_df['embeddings'] = new_df['embeddings'].astype(str)11
12# Append data to SQL table13new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)

Dashboard for Monitoring Anomalies over Time

Data Visualization Setup

  • Library Imports: pandas, plotly.express, and sqlalchemy.

  • Database Connection: Establishes connection to the database using create_engine(connection_url).

Data Retrieval and Processing

  • SQL Query for Data Fetching: Retrieves anomaly data from sensor_data_with_vectors table, excluding entries with 'none' in the anomaly field.

  • Data Preparation:

    • Converts date column to datetime format and extracts date part into a new column date_only.

    • Groups data by date_only and anomaly, counting occurrences to prepare for visualization.

Plotting Anomalies over Time

  • Overall Anomaly Trends:

    • Utilizes plotly.express to create a histogram representing anomaly counts over time.

    • Each anomaly type is color-coded for distinction.

  • City-Specific Anomaly Trends:

    • Further groups data by city along with date_only and anomaly.

    • Loops through a predefined list of cities to create separate histograms for each city, showcasing city-specific anomaly trends.

In [17]:

1import pandas as pd2import plotly.express as px3from sqlalchemy import create_engine4engine = create_engine(connection_url)

In [18]:

1# df = pd.read_sql('select * from sensor_data_with_vectors limit 50000;', engine)2df = pd.read_sql("select * from sensor_data_with_vectors where anomaly <> 'none' limit 50000;", engine)

In [19]:

1df['date'] = pd.to_datetime(df['date'])2df['date_only'] = df['date'].dt.date3# Group data by date and anomaly, then count the instances4grouped_df = df.groupby(['date_only', 'anomaly']).size().reset_index(name='counts')5
6# Create line plot with Plotly7fig = px.histogram(grouped_df, x='date_only', y='counts', color='anomaly',8              title='Anomalies over Time', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})9
10# Show plot11fig.show()

In [20]:

1# Group data by date, city and anomaly, then count the instances2grouped_df = df.groupby(['date_only', 'city', 'anomaly']).size().reset_index(name='counts')3
4# List your cities5cities = ['Washington DC', 'New York', 'Los Angeles'] # Add or change according to your DataFrame6
7# Create a separate plot for each city8for city in cities:9    city_df = grouped_df[grouped_df['city'] == city]10    fig = px.histogram(city_df, x='date_only', y='counts', color='anomaly',11                  title=f'Anomalies over Time for {city}', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})12    fig.show()

Real-Time Anomaly Detection Dashboard

Environment Setup and Library Imports

  • Library Installation: Installs tabulate, pymysql, Ipython, and rich libraries.

  • Imports: Includes libraries such as time, os, shutil, pymysql, rich.console, rich.table, IPython.display, sqlalchemy, and pandas.

Dashboard Function Definition

  • Function display_table_contents:

    • Establishes a database connection using create_engine(connection_url).

    • Executes an SQL query to fetch initial data from sensor_data_with_vectors with specific columns (date, vent, pluie, temp, anomaly).

    • Enters a loop to continuously update and display the dashboard.

Dashboard Display Mechanics

  • Console and Table Setup:

    • Clears the console output and creates a console instance with rich.console.

    • Determines the terminal width and sets up a dynamic table layout.

    • Adds columns with specific styles and alignments for better readability.

  • Data Display and Refresh Loop:

    • Adds the top 50 rows from the fetched data to the table.

    • Styles rows based on the anomaly type (e.g., different colors for different anomaly types).

    • Refreshes the display every 10 seconds, fetching updated data from the database.

In [21]:

1!pip install tabulate pymysql Ipython rich --quiet

In [22]:

1import time2import os3import shutil4import pymysql5from rich.console import Console6from rich.table import Table7from rich import box8from IPython.display import clear_output

In [23]:

1from sqlalchemy import create_engine2import pandas as pd3
4def display_table_contents():5    # Create a database engine6    engine = create_engine(connection_url)7
8    # Execute query to fetch initial table contents9    query = 'SELECT date, vent, pluie, temp, anomaly FROM sensor_data_with_vectors ORDER BY date DESC'10    table_data = pd.read_sql_query(query, engine)11
12    while True:13        # Clear console output14        clear_output(wait=True)15
16        # Create a console instance17        console = Console()18
19        # Get the terminal width20        terminal_width = shutil.get_terminal_size().columns21
22        # Print the title with centered alignment23        title = "[bold magenta]REAL TIME ANALYTICS DASHBOARD[/bold magenta]"24        console.print(title.center(terminal_width))25
26        # Create a table instance27        table = Table(show_header=True, header_style="bold", box=None)28
29        # Add columns to the table30        table.add_column("Date", justify="center", style="cyan", width=terminal_width // 5 + 5)31        table.add_column("Vent", justify="center", style="magenta", width=terminal_width // 5)32        table.add_column("Pluie", justify="center", style="yellow", width=terminal_width // 5)33        table.add_column("Temp", justify="center", style="green", width=terminal_width // 5)34        table.add_column("Anomaly", justify="center", width=terminal_width // 5)35
36        # Add rows to the table37        for row in table_data.head(50).itertuples(index=False):38            # Convert datetime to string before adding to the table39            formatted_row = [str(cell) for cell in row]40
41            # Check the anomaly type42            anomaly_type = formatted_row[4]43
44            # Determine the style based on the anomaly type45            if anomaly_type == 'pluie':46                style = "bold blue"47            elif anomaly_type == 'vent':48                style = "bold magenta"49            elif anomaly_type == 'temp':50                style = "bold green"51            else:52                style = ""53
54            # Add the row with the appropriate style55            table.add_row(*formatted_row, style=style)56
57        # Print the table58        console.print(table)59
60        # Wait for 30 seconds before refreshing61        time.sleep(10)62
63        # Execute query to fetch updated table contents64        updated_data = pd.read_sql_query(query, engine)65
66        # Update the table_data with the fetched data67        table_data = updated_data68
69# Call the function to start displaying the table contents70display_table_contents()

Details


About this Template

Real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques.

This Notebook can be run in Standard and Enterprise deployments.

Tags

vectordbrealtime

License

This Notebook has been released under the Apache 2.0 open source license.

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.