In the era of the Internet of Things (IoT), data is generated at an unprecedented rate.
This data has the potential to provide insights that drive business value and innovation — but with the sheer volume of data, it’s essential to identify and act upon any anomalies that may arise in real time.
The Challenges of IoT Data
IoT data is characterized by its high velocity and volume, and detecting rare occurrences that differ significantly from standard behavior in IoT data is crucial. The challenges include issues with real-time data ingestion, high response times, slow queries and the need for a database that is compatible with vector processing and supports SQL. Additionally, there is a need for a single integrated environment for analytics projects.
Introducing SingleStore
SingleStore offers a suite of features to address the challenges present with IoT data. SingleStoreDB is a database that provides high-velocity data ingestion pipelines, fast analytics, hybrid transaction-analytical processing and enhanced vector support.
In our first-ever demothon, we were tasked with building a power demo based around SingleStore 8.1 features to articulate the critical capabilities and business value of any given solution. Our team chose to demonstrate SingleStore’s prowess in real-time anomaly detection for IoT data. Read on to see the features we highlighted, and how our demonstration worked.
High-velocity data ingestion pipelines
SingleStore enables lightning-fast,real-time data ingestion pipelines that efficiently handle high volumes of data streaming into the system. These pipelines capture, process and load data with minimal latency, enabling real-time analytics and decision making.
Hybrid Transaction-Analytical Processing (HTAP)
SingleStore provides robust support for both transactional (OLTP) and analytical (OLAP) workloads within a single database platform. This unified approach allows organizations to handle operational transactions, while concurrently running complex analytical queries.
Enhanced vector support
Vector processing capabilities are leveraged to accelerate data processing and analytics. Vector support results in significant performance gains and improved efficiency for complex analytical workloads.
You can read more about our built-in vector capabilities here.
Live analytics and interactive dashboards
SingleStore empowers users to perform live analytics and create interactive dashboards through a built-in notebook feature. This intuitive interface enables data exploration, ad-hoc analysis and visualization, facilitating real-time insights and data-driven decision making.
The Architecture
The architecture involves major components including SingleStore Notebooks, source system, Kafka, Python (and its libraries), Pipelines and vector processing. SingleStore Notebooks acts as an integrated environment for analytics, where Python libraries are used for data processing — and SingleStore for data storage and retrieval.
The Demonstration
Our team presented a five-minute demo showcasing real-time data ingestion leveraging SingleStore's Pipeline function. We utilized Python in SingleStore's Notebook environment to generate vector embeddings, leveraging SQL support for vector processing (including the dot_product function).
The demo highlighted SingleStore's ability to handle both transactional and analytical queries, and visualized time-series data for effective anomaly monitoring.
-- Creating a new database named 'iot_sensor_db'
CREATE DATABASE iot_sensor_db;
USE iot_sensor_db;
-- Creating a table 'sensor_data_with_vectors' with specified columns
CREATE TABLE sensor_data_with_vectors (
date DATETIME, -- Date of the sensor reading
city VARCHAR(50), -- City where the sensor is located
longitude VARCHAR(50), -- Longitude of the sensor location
latitude VARCHAR(50), -- Latitude of the sensor location
vent FLOAT(8,2), -- Wind speed data from the sensor
pluie FLOAT(8,2), -- Rainfall data from the sensor
temp FLOAT(8,2), -- Temperature data from the sensor
anomaly VARCHAR(10), -- Anomaly detection result
embeddings TEXT -- Vector embeddings of the sensor data
);
-- Creating a staging table 'sensor_data_stage' with specified columns
CREATE TABLE sensor_data_stage (
date DATETIME, -- Date of the sensor reading
city VARCHAR(50), -- City where the sensor is located
longitude VARCHAR(50), -- Longitude of the sensor location
latitude VARCHAR(50), -- Latitude of the sensor location
vent FLOAT(8,2), -- Wind speed data from the sensor
pluie FLOAT(8,2), -- Rainfall data from the sensor
temp FLOAT(8,2), -- Temperature data from the sensor
embeddings TEXT -- Vector embeddings of the sensor data
);
-- Creating a pipeline 'sensor_data_pipeline' to load historical data
from an S3 bucket
CREATE OR REPLACE PIPELINE sensor_data_pipeline AS
LOAD DATA S3 's3://gpsteam/demothon/with_cities_embeddings.csv'
CREDENTIALS '{"aws_access_key_id": "<your_aws_access_key_id>",
"aws_secret_access_key": "<your_aws_secret_access_key>",
"aws_session_token": "<your_aws_session_token>"}'
INTO TABLE sensor_data_with_vectors
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 LINES;
-- Starting the 'sensor_data_pipeline'
START PIPELINE sensor_data_pipeline;
-- Creating a pipeline 'sensor_data_kafka_pipeline' to load real-time
data from a Kafka topic
CREATE OR REPLACE PIPELINE sensor_data_kafka_pipeline
AS LOAD DATA KAFKA '<your_kafka_broker>:9094/<your_topic_name>'
CONFIG '{
"security.protocol" : "SASL_SSL",
"sasl.mechanism" : "SCRAM-SHA-256",
"sasl.username" : "<your_username>",
"ssl.ca.location" :
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{
"sasl.password" : "your_password"}'
INTO TABLE sensor_data_stage
FORMAT JSON (
date <- %::date,
city <- %::city,
longitude <- %::longitude,
latitude <- %::latitude,
vent <- %::vent,
pluie <- %::pluie,
temp <- %::temp,
embeddings <- %::embeddings
);
-- Starting the 'sensor_data_kafka_pipeline'
START PIPELINE sensor_data_kafka_pipeline;
# Python code for data preparation and generating vector embeddings
# Importing necessary libraries
import sqlalchemy
from sqlalchemy import create_engine
import json
import pandas as pd
import umap
from sklearn.preprocessing import normalize
# Create a connection to the database
engine = create_engine(connection_url)
# Query the database to get the data from 'sensor_data_stage' table
df = pd.read_sql('select * from sensor_data_stage where date', engine)
# Fill some null values using backward fill method
df = df.bfill(axis=0)
# Remove rows with null values
df = df.dropna()
# Install umap-learn library for generating vector embeddings
!pip install umap-learn
# Import necessary libraries
import umap
from sklearn.preprocessing import normalize
# Select the features for generating embeddings
features = new_df1[['vent', 'pluie', 'temp']]
# Create a UMAP reducer with 15 components
reducer = umap.UMAP(n_components=15)
# Fit the reducer to the features and transform the features
embeddings = reducer.fit_transform(features)
# Normalize the embeddings
normalized_embeddings = normalize(embeddings, norm='l2')
# Add the embeddings to the dataframe
new_df1['embeddings'] = list(normalized_embeddings)
# Iterate over each row in the new DataFrame
for index, row in new_df.iterrows():
# Get the embeddings from the current row
embeddings = row['embeddings']
# Convert numpy array to list and then to a JSON string
embeddings_json = json.loads(embeddings)
# Create the query string
query = f"""
SELECT anomaly, COUNT(anomaly) as count
FROM (
SELECT anomaly, dot_product(
JSON_ARRAY_PACK('{embeddings_json}'),
JSON_ARRAY_PACK(sensor_data_with_vectors.embeddings)
) AS similarity
FROM sensor_data_with_vectors
ORDER BY similarity DESC
LIMIT 20
) AS subquery
GROUP BY anomaly
ORDER BY count DESC;
"""
# Execute the query
result = pd.read_sql_query(query, con=engine)
# Check if the result is empty
if not result.empty:
# Append the result to the current row in the new DataFrame
new_df.loc[index, 'anomaly'] = result['anomaly'].values[0]
else:
# Set anomaly to None or some default value
new_df.loc[index, 'anomaly'] = 'none'
# Convert the data types of the columns in the new DataFrame
new_df['date'] = pd.to_datetime(new_df['date'])
new_df['city'] = new_df['city'].astype(str)
new_df['longitude'] = new_df['longitude'].astype(str)
new_df['latitude'] = new_df['latitude'].astype(str)
new_df['vent'] = new_df['vent'].astype(float)
new_df['pluie'] = new_df['pluie'].astype(float)
new_df['temp'] = new_df['temp'].astype(float)
new_df['anomaly'] = new_df['anomaly'].astype(str)
new_df['embeddings'] = new_df['embeddings'].astype(str)
# Append the new DataFrame to the 'sensor_data_with_vectors' table in
the database
new_df.to_sql('sensor_data_with_vectors', con=engine,
if_exists='append', index=False)
Displaying real-time data in graph:
import pandas as pd
from sqlalchemy import create_engine
import plotly.express as px
engine = create_engine(connection_url)
df = pd.read_sql('select * from sensor_data_with_vectors limit 50000;',
engine)
df['date'] = pd.to_datetime(df['date'])
df['date_only'] = df['date'].dt.date
# Group data by date and anomaly, then count the instances
grouped_df = df.groupby(['date_only',
'anomaly']).size().reset_index(name='counts')
# Create line plot with Plotly
fig = px.line(grouped_df, x='date_only', y='counts', color='anomaly',
title='Anomalies over Time', labels={'date_only': 'Date',
'counts': 'Anomaly Count'})
# Show plot
fig.show()
Business Benefits
Implementing real-time anomaly detection in IoT data has several benefits:
- Early anomaly detection. Quickly identify and address anomalies to prevent cascading failures, and optimize performance.
- Better route optimization. For logistics and transportation, real-time data can help optimize routes.
- Risk mitigation strategies. Early detection of anomalies allows businesses to develop strategies to mitigate risks.
- Optimized maintenance schedules. Predictive maintenance can be performed by analyzing the data trends.
- Reduced latency. Faster data processing and reduced response times.
- Single integrated environment. A single environment for both transactional and analytical data processing simplifies operations.
Conclusion
Real-time anomaly detection in IoT data is imperative for businesses looking to leverage the full potential of IoT. SingleStoreDB offers a powerful set of features that helps organizations achieve this. With high-velocity data ingestion pipelines, enhanced vector support and HTAP capabilities, SingleStore is well equipped to handle the challenges posed by IoT data.
Thank you for reading! Check out these additional resources — and stay tuned for more blogs from our demothon teams.
Additional resources: