
IT Threat Detection, Part 3
Notebook

Note
This tutorial is meant for Standard & Premium Workspaces. You can't run this with a Free Starter Workspace due to restrictions on Storage. Create a Workspace using +group in the left nav & select Standard for this notebook. Gallery notebooks tagged with "Starter" are suitable to run on a Free Starter Workspace
Get pipeline data from Confluent (Kafka)
We recommend for that step to use a S1+ size workspace
Action Required
Make sure to select the siem_log_kafka_demo database from the drop-down menu at the top of this notebook. It updates the connection_url which is used by the %%sql magic command and SQLAlchemy to make connections to the selected database.
In [1]:
1
%%sql2
DROP PIPELINE IF EXISTS `siem_log_real`;3
DROP TABLE IF EXISTS `siem_log_real`;
We start creating a simple table to load the logs into a JSON column
In [2]:
1
%%sql2
CREATE TABLE IF NOT EXISTS `siem_log_real` (3
`logs` JSON COLLATE utf8_bin4
, SHARD KEY ()5
) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE='STRICT_ALL_TABLES';
We create a pipeline from the Confluent Cluster with an interval of 20ms
In [3]:
1
%%sql2
CREATE PIPELINE `siem_log_real`3
AS LOAD DATA KAFKA 'pkc-p11xm.us-east-1.aws.confluent.cloud:9092/singlestore_topic'4
CONFIG '{\"sasl.username\": \"WTIVCYPLUAIMIAYQ\",\n \"sasl.mechanism\": \"PLAIN\",\n \"security.protocol\": \"SASL_SSL\",\n \"ssl.ca.location\": \"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem\"}'5
CREDENTIALS '{"sasl.password": "/qIOhlTFEK8RNNCc1qSOnpNj4mqhXfudBlQQFgRfc0qBEjfm99VcyvEuwPILBcnv"}'6
BATCH_INTERVAL 207
DISABLE OFFSETS METADATA GC8
INTO TABLE `siem_log_real`9
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'10
LINES TERMINATED BY '\n' STARTING BY '';
Let's start the pipeline
In [4]:
1
%%sql2
START PIPELINE siem_log_real;
We extract a few elements from the JSON column such as timestamp, Log_ID, and the vector to be stored in a blob format. Data is extracted as soon as an update is made to the table
In [5]:
1
%%sql2
ALTER TABLE siem_log_real3
ADD COLUMN Timestamp as JSON_EXTRACT_STRING(`logs`,'Timestamp') PERSISTED datetime,4
ADD COLUMN model_res_blob AS JSON_ARRAY_PACK_F32(JSON_EXTRACT_STRING(`logs`, 'model_res')) PERSISTED BLOB,5
ADD COLUMN Log_ID AS JSON_EXTRACT_BIGINT(`logs`, 'Log_ID') PERSISTED bigint;
Install libraries for real-time dashboarding with Perspective
In [6]:
1
%pip install perspective-python --quiet
In [7]:
1
import perspective2
import threading3
import random4
import time5
from datetime import datetime, date6
from perspective import Table, PerspectiveWidget7
import warnings8
warnings.filterwarnings('ignore')
We will set dashboard with a refresh rate of 500ms. We use two modes: stop and run to stop a dashboard retrieving results from the database.
In [8]:
1
def loop():2
while mode != 'stop':3
while mode == 'run':4
table.update(data_source())5
time.sleep(0.5)
Track Real-Time Connections
In [9]:
1
def data_source():2
result = %sql select Timestamp, count(*) as count_connections from siem_log_real group by Timestamp order by Timestamp desc limit 1003
result2 = list(result.dicts())4
return result25
6
SCHEMA = {7
"Timestamp": datetime,8
"count_connections": int9
}
In [10]:
1
mode = 'run'2
table = perspective.Table(SCHEMA, limit=100)3
threading.Thread(target=loop).start()
In [11]:
1
perspective.PerspectiveWidget(table,title = "Track Real-Time Connections", group_by=["Timestamp"],plugin="Y Line",columns=["count_connections"])
In [12]:
1
mode = 'stop'
Monitor and Infer IT Threats using Semantic Search over Real-Time Data
In [13]:
1
def data_source():2
result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT log_status, count(Log_ID) as count_connections FROM label_table group by log_status;3
result2 = list(result.dicts())4
return result25
6
SCHEMA = {7
"log_status": str,8
"count_connections": int9
}
In [14]:
1
mode = 'run'2
table = perspective.Table(SCHEMA, limit=100)3
threading.Thread(target=loop).start()
In [15]:
1
perspective.PerspectiveWidget(table,title = "Monitor Threat Inference", split_by=["log_status"],plugin="Y Line",columns=["count_connections"])
In [16]:
1
mode = 'stop'
Track latest connections with Inferences Threat Inference by Log IDs
In [17]:
1
def data_source():2
result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT * FROM label_table;3
result2 = list(result.dicts())4
return result25
6
SCHEMA = {7
"Log_ID": str,8
"TIMESTAMP": datetime,9
"log_status": str10
}
In [18]:
1
mode = 'run'2
table = perspective.Table(SCHEMA, limit=20)3
threading.Thread(target=loop).start()
In [19]:
1
perspective.PerspectiveWidget(table,title = "Latest Connections", group_by=["TIMESTAMP"],plugin="Datagrid",columns=["count_attack"])
In [20]:
1
mode = 'stop'

Details
About this Template
Part 3 of Real-time threat Detection - Integrate with Kafka, run and visualize Threat Detection on incoming logs. This notebook requires adjustments to work out of the box.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.