In modern data architectures, Change Data Capture (CDC) is crucial for real-time data replication across systems. This article demonstrates how to integrate a SingleStore database with Apache Kafka using the Debezium connector for SingleStore. Under the hood it tracks all create, update and delete operations in SingleStore tables.
Apache Kafka
Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process and integrate these data streams with other parts of your system in a secure, reliable and scalable way.
Kafka Connect
To build integration solutions, you can use the Kafka Connect framework, which provides a suite of connectors to integrate Kafka with external systems. There are two types of Kafka connectors:
- Source connector, used to move data from source systems to Kafka topics
- Sink connector, used to send data from Kafka topics into the target (sink) system.
Debezium
Debezium is a set of distributed services that capture row-level changes in your databases so that your applications can see and respond to those changes. Debezium records in a transaction log all row-level changes committed to each database table. Each application simply reads the transaction logs they’re interested in, seeing all the events in the same order they occur.
Demo details
Check out this blog to learn more about SingleStore’s CDC capabilities.
Prerequisites
The demo will have the following components:
- Docker and Docker Compose. Ensure Docker and Docker Compose are installed on your machine.
- SingleStore. Use SingleStore as your database to host a table.
- Apache Kafka. Kafka will serve as the messaging system.
- Zookeeper. Distributed coordination service that aids in the management of Kafka
- Debezium: Debezium will capture and stream changes from SingleStore
Docker and Docker Compose installation
You can install Docker using the guide found here. Based on the platform of choice, you will want to ensure Docker compose is available.
Setting up the environment
We will use Docker compose to install the following components:
- SingleStore database dev image: ghcr.io/singlestore-labs/singlestoredb-dev:latest
- Zookeeper image: wurstmeister/zookeeper
- Kafka image: wurstmeister/kafka
- Kafka Connect: debezium/connect:1.6
- Kafdrop: obsidiandynamics/kafdrop (this is optional but it allows us to view the topic via a GUI)
- Download the SingleStore Debezium Connector from this link, and have it extracted. The download path will be used in the compose file.
Here is a sample compose file for this demo, which we’re callingS2cdc.yml
version: '3.8'services:#SingleStoresinglestore:image: ghcr.io/singlestore-labs/singlestoredb-dev:latestplatform: linux/x86_64ports:- 3306:3306- 8080:8080environment:# use the LICENSE_KEY environment variable set in the terminal:- SINGLESTORE_LICENSE=<YOUR LICENSE KEY OR TRIAL LICENSE KEY>- ROOT_PASSWORD=<YOUR ROOT PASSWORD>zookeeper:image: wurstmeister/zookeeperplatform: linux/x86_64ulimits:nofile:soft: 65536hard: 65536container_name: zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkacontainer_name: kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXTKAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093KAFKA_INTER_BROKER_LISTENER_NAME: INSIDEKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: "baeldung:1:1"depends_on:- zookeeperkafka-connect:image: debezium/connect:1.6platform: linux/x86_64container_name: kafka-connectports:- "8083:8083"environment:- BOOTSTRAP_SERVERS=kafka:9092- GROUP_ID=connect-cluster- CONFIG_STORAGE_TOPIC=my_connect_configs- OFFSET_STORAGE_TOPIC=my_connect_offsets- STATUS_STORAGE_TOPIC=my_connect_statusesdepends_on:- kafka- zookeepervolumes:- <path to your SingleStore Debezium Connector>/singlestore-debezium-connector:/kafka/connect/singlestorekafdrop:image: obsidiandynamics/kafdropplatform: linux/x86_64container_name: kafdropports:- "9000:9000"environment:KAFKA_BROKERCONNECT: kafka:9092depends_on:- kafka
Create the containers
SingleStore_CDCOUT_TO_KAFKA % docker compose -f S2cdc.yml up -d
Once created, it should show the following status:
[+] Running 6/6✔ Network singlestore_cdcout_to_kafka_kafka-net Created0.0s✔ Container zookeeper Started0.4s✔ Container singlestore_cdcout_to_kafka-singlestore-1 Started0.4s✔ Container kafka Started0.4s✔ Container kafdrop Started0.5s✔ Container kafka-connect Started
Validate the containers
SingleStore_CDCOUT_TO_KAFKA % docker psCONTAINER ID IMAGE COMMAND CREATED STATUSPORTS NAMESc307da3f3dbb debezium/connect:1.6 "/docker-entrypoint.…" About a minute ago UpAbout a minute 8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp kafka-connectb260d3082396 obsidiandynamics/kafdrop "/kafdrop.sh" About a minute ago UpAbout a minute 0.0.0.0:9000->9000/tcp kafdrop3ff856911f96 wurstmeister/kafka "start-kafka.sh" About a minute ago UpAbout a minute 0.0.0.0:9092->9092/tcp kafka3f4425daec34 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About a minute ago UpAbout a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper799a0ee5a935 ghcr.io/singlestore-labs/singlestoredb-dev:latest "/scripts/start.sh" About a minute ago UpAbout a minute (healthy) 0.0.0.0:3306->3306/tcp, 0.0.0.0:8080->8080/tcp, 9000/tcpsinglestore_cdcout_to_kafka-singlestore-1
Exec into your Kafka Connect container to verify the SingleStore Debezium jars are present
SingleStore_CDCOUT_TO_KAFKA % docker exec -it kafka-connect /bin/bashc[kafka@c307da3f3dbb ~]$ cd connect[kafka@c307da3f3dbb connect]$ lsdebezium-connector-db2 debezium-connector-mongodb debezium-connector-mysql debezium-connector-oracledebezium-connector-postgres debezium-connector-sqlserver debezium-connector-vitess singlestore[kafka@c307da3f3dbb connect]$ cd singlestore[kafka@c307da3f3dbb singlestore]$ pwd/kafka/connect/singlestore
Log in to your SingleStore cluster, creating the required tables that need to be streamed
If you are using the dev image, SingleStore Studio is available here (check your Docker compose yaml).
CREATE DATABASE CDCOUT_KAFKA;USE CDCOUT_KAFKA;CREATE TABLE Kafka_test_table ( id INT , Student_Name VARCHAR(50), age INT, marks INT )
Enable observe for the cluster
curl -i -X POST \-H "Accept:application/json" \-H "Content-Type:application/json" \127.0.0.1:8083/connectors/ \-d '{"name": "singlestore-debezium-connector","config":{"connector.class": "com.singlestore.debezium.SingleStoreConnector","tasks.max": "1","database.hostname": "singlestore","database.port": "3306","database.user": "root","database.password": "Root","topic.prefix": "SingleStore_CDCOUT_KAFKA_TEST","database.dbname": "CDCOUT_KAFKA","database.table": "Kafka_test_table","delete.handling.mode": "none","topic.name": "S2_CDCOUT_KAFKA"}}'
You should see the operation is successful.
HTTP/1.1 201 CreatedDate: Fri, 09 Aug 2024 17:22:20 GMTLocation: http://172.18.0.6:8083/connectors/singlestore-debezium-connectorContent-Type: application/jsonContent-Length: 449Server: Jetty(9.4.38.v20210224){"name":"singlestore-debezium-connector","config":{"connector.class":"com.singlestore.debezium.SingleStoreConnector","tasks.max":"1","database.hostname":"singlestore","database.port":"3306","database.user":"root","database.password":"Root","topic.prefix":"SingleStore_CDCOUT_KAFKA","database.dbname":"CDCOUT_KAFKA","database.table":"Kafka_test_table","delete.handling.mode":"none","name":"singlestore-debezium-connector"},"tasks":[],"type":"source"}[kafka@c307da3f3dbbconnect]$
Navigate to the Kafdrop UI to see the topics. It should be accessible here (check your Docker Compose file).
Validate existing entries
Add a few entries to see if the changes are streamed to Kafka
You can see here the offset count has increased.
Now, you can proceed in validating the data.
You can see how easy it is to set up a streaming CDC feed out to Kafka from SingleStore. Check out our SingleStore documentation for more info, and start your free trial today.