Importing Data from Kafka into SingleStore using Pipelines
Notebook
Input Credentials
Define the BOOTSTRAP_SERVER, PORT, TOPIC,SASL_USERNAME,SASL_MECHANISM,SECURITY_PROTOCOL, and SASL_PASSWORD variables below for integration, replacing the placeholder values with your own.
In [1]:
BOOTSTRAP_SERVER = 'bootstrap-server-url'PORT = kafka-broker-portTOPIC = 'kafka-topic'SASL_USERNAME = 'username'SASL_MECHANISM = 'sasl-mechanism'SECURITY_PROTOCOL = 'security-proptocol'SASL_PASSWORD = 'password'
This notebook demonstrates how to create a sample table in SingleStore, set up a pipeline to import data from Kafka Topic, and run queries on the imported data. It is designed for users who want to integrate Kafka data with SingleStore and explore the capabilities of pipelines for efficient data ingestion.
Pipeline Flow Illustration
Creating Table in SingleStore
Start by creating a table that will hold the data imported from Kafka.
In [2]:
%%sql/* Feel free to change table name and schema */CREATE TABLE IF NOT EXISTS my_table (id INT,name VARCHAR(255),age INT,address TEXT);
Create a Pipeline to Import Data from Kafka
You'll need to create a pipeline that pulls data from Kafka topic into this table. This example assumes you have a JSON Message in your Kakfa topic.
Ensure that: You have access to the Kafka topic. Proper IAM roles or access keys are configured in SingleStore. The JSON message has a structure that matches the table schema.
Using these identifiers and keys, execute the following statement.
In [3]:
%%sqlCREATE OR REPLACE PIPELINE kafka_import_pipeline AS LOAD DATA KAFKA '{{BOOTSTRAP_SERVER}}:{{PORT}}/{{TOPIC}}'CONFIG '{"sasl.username": "{{SASL_USERNAME}}","sasl.mechanism": "{{SASL_MECHANISM}}","security.protocol": "{{SECURITY_PROTOCOL}}","ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'CREDENTIALS '{"sasl.password": "{{SASL_PASSWORD}}"}'INTO TABLE my_tableFORMAT JSON ;
Start the Pipeline
To start the pipeline and begin importing the data from the Kafka topic:
In [4]:
%%sqlSTART PIPELINE kafka_import_pipeline;
You can see status of your pipeline Click Here
Select Data from the Table
Once the data has been imported, you can run a query to select it:
In [5]:
%%sqlSELECT * FROM my_table LIMIT 10;
Check if all data of the data is loaded
In [6]:
%%sqlSELECT count(*) FROM my_table
Conclusion
We have shown how to insert data from a Kafka topic using Pipelines
to SingleStoreDB. These techniques should enable you to
integrate your Kafka topic with SingleStoreDB.
Clean up
Remove the '#' to uncomment and execute the queries below to clean up the pipeline and table created.
Drop Pipeline
In [7]:
%%sql#STOP PIPELINE kafka_import_pipeline;#DROP PIPELINE kafka_import_pipeline;
Drop Data
In [8]:
%%sql#DROP TABLE my_table;
Details
About this Template
This notebook demonstrates how to create a sample table in SingleStore, set up a pipeline to import data from Kafka topic.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.