in Product


Change Data Capture for SingleStore

In the modern data stack, real-time insights and seamless integration are essential for organizations aiming to stay ahead.

Change Data Capture for SingleStore

SingleStore, the database that helps power real-time intelligent applications, has added yet another powerful capability to build such applications with the public preview introduction and support for Change Data Capture (CDC). This latest feature promises to revolutionize how businesses capture and utilize data, empowering them with enhanced agility and efficiency in their operations.

Change Data Capture (CDC) exposes an event stream of every alteration made to data within a specified database. It captures events including inserts, updates, deletions and even schema changes to tables in real time.

SingleStore’s CDC support marks a significant advancement in data integration capabilities. By enabling real-time data synchronization from SingleStore databases to external systems, organizations can now:

  • Enhance decision-making. Access up-to-date data across systems for faster and more informed decision-making processes.
  • Ensure data consistency. Maintain consistency between operational databases and analytics platforms, ensuring that insights are based on the latest information.
  • Facilitate real-time analytics. Empower analytics and business intelligence tools with live data feeds, enabling real-time reporting and analysis.

Q: Why would I use CDC?

The need for CDC is typically found somewhere on the spectrum of “needing to move data from one location to another” and “needing to consume data as it’s being created”. All the way on the left-hand side of that spectrum we find your replication scenarios, e.g. a migration from one database vendor to another, or a switch from a local on-premise deployment to a cloud deployment.

On the right-hand side, we find the pure consumers of data, e.g. notification systems, triggers, stream processing and event-based architecture. However, most scenarios are typically found somewhere in the middle:

  • ETL pipelines have both a need to receive the data in real time, while facilitating a transformation into a target system
  • Fan-out (e.g edge provisioning) requires filtering/copying data
  • Fan-in (e.g. data reconciliation) requires multiplexing/aggregation

Q: Can’t I just BACKUP my database and import it?

Of course you could! However (… pause for dramatic effect …) how will you make sure everything has been copied? Will you stop all active writes for the duration of the copy? Do you have enough memory and disk space available to make the copies? How will you transfer the copies? How will you load the copies? What if you need to transform parts of the data? What if you only need to replicate a subset of the database, like. a single table? Or a few columns? What if you need to keep multiple systems in sync continuously?

With CDC it becomes possible to do online replication where the snapshot of the database is migrated, followed by a stream of incremental changes to apply afterwards. This change stream can run in parallel with active writes, and  will continuously observe and apply new changes. As change events are sent in near real time, you can also avoid having a large gap in consistency between the source and target systems.

cdc-deep-diveCDC deep dive

Q: Break me off a piece of that CDC-pie! How does that work again?

I’m glad you asked! At SingleStore, we want to make the step-in boundary for CDC as small as possible. No need to spin up a CDC cluster, complex pipelines or  create table/database tracking copies. Using CDC is immediately available through the use of the OBSERVE query, which is a new type of Data Manipulation Language (DML) query.

The OBSERVE query provides row-based change record information as a long-running streaming query. Every insert, update and delete to a row of a table contained in the table filter of the query will be sent in the result set.

CREATE DATABASE dbTest;
USE dbTest;
CREATE TABLE test1(
ID INT NOT NULL PRIMARY KEY,
Code VARCHAR(4));
/* Start OBSERVE query here on a separate connection, see below. */
INSERT INTO test1(ID, Code) VALUES (1, "KorE");
INSERT INTO test1(ID, Code) VALUES (2, "PamY");
INSERT INTO test1(ID, Code) VALUES (3, "TabK");
UPDATE test1 SET Code = "JonA" WHERE ID = 2;
> OBSERVE * FROM test1;
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+
| Offset | PartitionId | Type
| Table | TxId
| TxPartitions | InternalId | ID | Code |
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+
| 0x0000000000000077000000000000000E000000000000E06E | 4 |
BeginTransaction | |
0x0100000000000000040000000000000077000000000000000E000000000000E06E000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000077000000000000000E000000000000E087 | 4 | Insert
| test1 |
0x0100000000000000040000000000000077000000000000000E000000000000E06E000000000
0000000 | 1 | 1152921504606846977 | 1 | KorE |
| 0x0000000000000077000000000000000E000000000000E088 | 4 |
CommitTransaction | |
0x0100000000000000040000000000000077000000000000000E000000000000E06E000000000
0000000 | 1 | 0 |.NULL | NULL |
| 0x0000000000000077000000000000000F000000000000F039 | 4 |
BeginTransaction | |
0x0100000000000000040000000000000077000000000000000F000000000000F039000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000077000000000000000F000000000000F052 | 4 | Insert
| test1 |
0x0100000000000000040000000000000077000000000000000F000000000000F039000000000
0000000 | 1 | 1152921504606846978 | 3 | TabK |
| 0x0000000000000077000000000000000F000000000000F053 | 4 |
CommitTransaction | |
0x0100000000000000040000000000000077000000000000000F000000000000F039000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000078000000000000000C000000000000C06E | 3 |
BeginTransaction | |
0x0100000000000000030000000000000078000000000000000C000000000000C06E000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000078000000000000000C000000000000C087 | 3 | Insert
| test1 |
0x0100000000000000030000000000000078000000000000000C000000000000C06E000000000
0000000 | 1 | 1152921504606846977 | 2 | PamY |
| 0x0000000000000078000000000000000C000000000000C088 | 3 |
CommitTransaction | |
0x0100000000000000030000000000000078000000000000000C000000000000C06E000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000078000000000000000E000000000000E039 | 3 |
BeginTransaction | |
0x0100000000000000030000000000000078000000000000000E000000000000E039000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000078000000000000000E000000000000E052 | 3 | Update
| test1 |
0x0100000000000000030000000000000078000000000000000E000000000000E039000000000
0000000 | 1 | 1152921504606846977 | 2 | JonA |
| 0x0000000000000078000000000000000E000000000000E053 | 3 |
CommitTransaction | |
0x0100000000000000030000000000000078000000000000000E000000000000E039000000000
0000000 | 1 | 0 | NULL | NULL |
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+

OBSERVE is a log-based CDC, meaning it uses the existing Write-Ahead-Log (WAL) of the database to source the information. Implementing a log-based CDC stream is nothing new, as MySQL’s Binlog has been the de-facto standard reference in log-based CDCs for some time. However, we are able leverage SingleStore’s Universal Storage engine to grant our CDC stream some useful properties.

The OBSERVE output is guaranteed to only contain committed data, meaning clients don’t need to buffer transactions because they might be rolled back at the end. Instead, as soon as a transaction’s records are made visible, a client can optimistically start writing them to the target system. The OBSERVE query will internally filter aborted and failed transactions.

going-distributed-withGoing distributed with OBSERVE

Q: Isn’t SingleStore distributed?  Doesn’t each partition have its own WAL?

Exactly! Compared to a system like  MySQL, SingleStore is a partitioned database with data being sharded across different nodes, and each partition operating its own WAL. As data committed to these nodes can be done either independently or as a coordinated transaction spanning multiple partitions, we need a way to multiplex the partition streams together.

The aggregator executing the OBSERVE will multiplex the individual streams together into a single streaming output. Each individual change record is annotated to indicate which partition it was written to, helping distinguish the potentially interleaved output.

> OBSERVE * from sharded_db;
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+
| Offset | PartitionId | Type
| Table | TxId
| TxPartitions | InternalId | ID | Code |
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+
| 0x0000000000000077000000000000000E000000000000E06E | 1 |
BeginTransaction | |
0x0100000000000000040000000000000077000000000000000E000000000000E06E000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000077000000000000000E000000000000E087 | 1 | Insert
| test1 |
0x0100000000000000040000000000000077000000000000000E000000000000E06E000000000
0000000 | 1 | 1152921504606846977 | 1 | KorE |
| 0x0000000000000077000000000000000F000000000000F039 | 2 |
BeginTransaction | |
0x0100000000000000040000000000000077000000000000000F000000000000F039000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000077000000000000000F000000000000F052 | 2 | Insert
| test1 |
0x0100000000000000040000000000000077000000000000000F000000000000F039000000000
0000000 | 1 | 1152921504606846978 | 2 | TabK |
| 0x0000000000000078000000000000000C000000000000C088 | 1 |
CommitTransaction | |
0x0100000000000000030000000000000078000000000000000C000000000000C06E000000000
0000000 | 1 | 0 | NULL | NULL |
| 0x0000000000000078000000000000000E000000000000E052 | 2 | Update
| test1 |
0x0100000000000000030000000000000078000000000000000E000000000000E039000000000
0000000 | 1 | 1152921504606846979 | 3 | JonA |
| 0x0000000000000078000000000000000E000000000000E053 | 2 |
CommitTransaction | |
0x0100000000000000030000000000000078000000000000000E000000000000E039000000000
0000000 | 1 | 0 | NULL | NULL |
+----------------------------------------------------+-------------+---------
-----------+--------+--------------------------------------------------------
------------------------------+--------------+----------------------+------+-
-----+

Q: What if I had a transaction that wrote to multiple partitions?

In case the change was part of a transaction spanning multiple partitions, the number of partitions the transaction was a part of can be found in the TxPartitions column. The TxId column will contain the unique (within a database) transaction identifier shared between all partitions. This allows clients to align changes between partitions and reconstruct a transaction between partitions. Importantly, our guarantee for only returning committed data holds true for distributed transactions — so you can skip performing alignment if it's not necessary for your use case.

It’s important to keep in mind that although the change records will likely be serialized at roughly the same time, there is no inherent ordering between transactions across different database partitions.

transaction-ordering-between-partitionsTransaction ordering between partitions

all-aboutAll about offsets

Q: I often trip over the power cable to my data center, what happens if the stream gets disconnected?

SingleStore’s CDC stream uses Offsets to mark points in database history where specific changes were made. These are strictly increasing and immutable values that can be provided as the starting point for the OBSERVE. Contrary to other CDC solutions, SingleStore does not use database-side cursors to retain where a CDC stream left off. Instead, a client can provide any Offset that had previously been observed — and the OBSERVE will continue from that point onward.

The OBSERVE queries will generate one Offset for each record, which is strictly increasing within a given partition but there is no implicit ordering between partitions. In contrast to something like Kafka topic offset, these Offsets are intended to be managed by the client, likely by persisting them locally or in a database.

The Offsets can be specified in the OBSERVE using the BEGIN AT clause. For example:

OBSERVE * FROM foo BEGIN AT
(0x0000000000000078000000000000000E000000000000E053,
0x0000000000000078000000000000000F000000000000E043, NULL,
0x0000000000000077000000000000000E000000000000E088)

This is for a database with four partitions. The Offset for each partition instructs that partition to start streaming changes that logically happened after that point. An Offset is immutable in that once it’s been issued it will always point to the same logical position in the WAL,  and can even when shared between queries.

Q: What if I am observing a large transaction and need to resume from the middle. Is there a performance cost?

Our resumable Offset implementation is designed around zero-buffering. Resuming halfway through a 1GiB transaction has similar performance characteristics to resuming from the start of a 1KiB transaction!

Q: So can I re-use the Offset?

Yes! As many times as you want across as many queries as you like. By giving consumers control over which Offsets to consider processed and when, we enable exactly-once message delivery semantics, the strictest set of guarantees available.

Q: Amazing! So I don’t need to be worried about my server not keeping up with processing the changes?

Exactly, you can always go back historically to any record you’ve processed/seen, and continue from there.

Q: What if I turn off my server for a month?

Yes*, you can still continue processing the records.

Q: What’s that asterisk doing there?

Well, you still have data retention to worry about. Compared to something that purely deals with message-queue functionality, SingleStore is still fundamentally a database — which means we’re optimizing for live data and do need to make trade-offs between retaining older transaction logs that will source our OBSERVE queries and consuming disk-space.

Offsets at some point will be considered stale, meaning we will no longer retrieve the data found at that logical point of the WAL. In practical terms, they become stale once they become older than the oldest snapshot in the system. The snapshots_to_keep and snapshot_trigger_size both control how many and how big the snapshots are, so you as the customer have some control over the data retention window.

More information on these values can be found in the SingleStore docs.

data-formatData format

Q: What do the change records look like?

As you, the eagle-eyed reader have likely deduced from the preceding sample output, the output of the change records is a row-based SQL format:

Offset: 0x0000000000000003000000000000009B0000000000000E73
PartitionId: 0
Type: Insert
Table: books
TxId:
0x0100000000000000000000000000000003000000000000009B0000000000000000000000000
0000000
TxPartitions: 1
InternalId: 1152921504606846977
id: 42
title: The Lord of the Rings
author: J.R.R. Tolkien
year: 1954

This is a newly inserted row in the books table on partition 0. The underlying table columns (id, title, author and year) are present, following a set of auxiliary columns:

  • Offset. Logical position in the WAL
  • PartitionId. Unique number for the partition containing the data
  • Type. Type of change record, e.g. Insert, Update, Delete, BeginTransaction, CommitTransaction
  • TxId. Unique identifier for each transaction. This matches every record part of that transaction  The TxId is also matched across different partitions in case of a multi-partition transaction
  • TxPartitions. The number of partitions a transaction was applied to.
  • InternalId. Unique internal identifier (for a given table), in case no primary key was defined.

Q: Is SQL the only option? What about Insert my esoteric format?

Currently only AS SQL as a formatting option is available. We’re actively working on supporting a JSON formatting option, as it provides some nice traits when running OBSERVE queries on multiple tables and/or when confronted with dynamic schema changes, but more on that later. With the AS JSON formatting, the SQL column output for the underlying table data would be replaced with a single JSON column Data:

Offset: 0x0000000000000003000000000000009B0000000000000E73
PartitionId: 0
Type: Insert
Table: books
TxId:
0x0100000000000000000000000000000003000000000000009B0000000000000000000000000
0000000
TxPartitions: 1
InternalId: 1152921504606846977
data: '{ "id": 42, "title": "The Lord of the Rings", "author": "J.R.R.
Tolkien", "year": 1954 }'

The benefit of the JSON format is that the schema of the ResultSet is stable regardless of the number of tables, and the schemas of those tables being observed.

long-lived-queriesLong-lived queries

Change data capture streams can be created using a variety of workflows. From a simple busy-polling loop, for example:

SELECT * FROM my_table WHERE created_on < (NOW() - INTERVAL 1 MINUTE)

to dedicated tables tracking dedicated change records. In SingleStore’s approach, changes are sourced directly from the WAL content. As no polling is necessary, our CDC stream is a unidirectional stream of change records. This makes the MySQL protocol an interesting fit for these changes. While SingleStore is perfectly capable of propagating a large number of rows to clients using streaming queries, most clients by default are set up to buffer rows until the query has completed. For OBSERVE queries, the intent of the query is not to terminate at all, which provides minimal latency in pushing new change records to clients.

The best approach here is to simply treat the query as one that is likely to return a huge amount of data: use an Unbuffered cursor. When using mysqlcli, look into using the –quick option.

Q: How do I stop the stream?

The same way every query is stopped, by killing it. The OBSERVE query can be explicitly stopped by using KILL QUERY, or it will be implicitly stopped once a dropped connection has been detected.

Alternatively, the END AT TAIL clause can be used to tell the OBSERVE query to stop once it reaches the canonical TAIL of the log. This isn’t a stable point in the WAL, as active writes are constantly moving this position — but it is guaranteed to be stable once any active writers are stopped. Using the END AT TAIL clause, you can transform the OBSERVE query from a long-lived stream to a polling-based query, fetching any change record between the last seen Offset and the TAIL.

Q: What if I idle the stream?

Certain DDL operations expect running queries to be stopped before proceeding. To prevent a long-running OBSERVE query from blocking these operations, there is a observe_leaf_timeout_secs global. observe_leaf_timeout_secs controls the maximum amount of time a connection will wait when flushing before the connection is considered close. As indicated by the flag name, the timeout is only used by partition OBSERVE queries. The aggregator’s multiplexing logic internally retries timed out partition queries at the correct offset. This means your idle stream will stay alive without blocking DDL operations.

ddl-changesDDL changes

Q: DDL, good point! What happens when I change my table schema? Do I have to start over?

Thankfully no*, SingleStore will support schema evolution by letting consumers see point-in-time consistent records. If data was created or modified to a table (~V1) and afterward the table was modified (~V2), like by adding or modifying a column, the original records would still be available in the V1 table format. SingleStore CDC tracks not only data changes but also schema changes. Alterations such as adding, modifying or removing columns, tables or databases are captured as events within the OBSERVE query. This allows clients to handle any necessary DDL events at the logical point where they take place in the CDC stream.

For example, when replicating changes to another database, the remote database can apply the same DDL event without additional coordination. This ensures that downstream systems remain synchronized with the evolving structure of your SingleStore database.

Dynamic schema support in Database CDC is rare as it is typically a complex thing to maintain. Databases generally optimize for retaining the latest shape of the data. At SingleStore, we have the unique opportunity to offer schema evolution without significant extra cost.

Q: There’s that asterisk again...

Well.. someone’s paying attention. Correct, SingleStore’s CDC out solution is only available in public preview right now, meaning we’re still gathering feedback on how customers use our technology. While CDC out is gaining increased traction, we are stabilizing our dynamic schema support — in addition to other improvements to our CDC out subsystem —  ensuring consumers like yourself have the appropriate set of tools available. If you have a strong interest in handling DDL events using SingleStore CDC, please let us know!

cdc-ecosystemCDC ecosystem

Q: Thanks for explaining the OBSERVE queries, how do I set up my OBSERVE pipeline?

Hold your horses Jack, let’s take a step back. There are any number of ways to create a CDC pipeline between two (or more) systems. This would translate into an immensely long list of configuration options for various technologies and pipelines, making it unmanageable from SingleStore’s perspective. Instead we offer three main approaches: DIY, open source and enterprise.

diyDIY

The DIY option offers the most flexibility, but requires managing the offsets, any data conversion, compute provisioning, buffering and more, depending on your use-case. However if you want to prototype, test or get something up and running quickly, this is generally the recommended approach. For more elaborate use-cases where you want to optimize for aspects such as performance, this approach provides the most amount of flexibility in handling the underlying change stream.

We’ve provided a toy example on how to implement table-to-table replication for SingleStore to any MySQL compatible database. It describes how to start and resume OBSERVE queries, how to replicate data in a transactional way, and how to manage Offsets. However as a toy example, it does not handle failure cases, manage durability or availability. This project is meant as a reference guide on how to implement your CDC consumer: CDC Code examples.

open-sourceOpen source

SingleStore  maintains a connector for the Debezium ecosystem. Debezium is an open-source framework for managing CDC streams from various databases. It typically uses an underlying message queue to propagate change records, e.g. Apache Kafka.

The Debezium connector will facilitate managing Offsets and converting the change data in a Debezium compatible format. You can find more information on how to set up the connector in the docs.

enterprise-solutionEnterprise solution

Some CDC approaches can be hard to manage for varying reasons, including incredibly large scale and/or very strict performance requirements. For these use cases, an enterprise partner can help facilitate SingleStore’s CDC workflow, bridging the gap to an ETL platform or a different database.

referencesReferences


Share