Welcome, DynamoDB x Rockset developers! This technical blog will show you how to integrate your DynamoDB database with SingleStore.

The combination of DynamoDB and SingleStore is a powerful architecture that leverages the former as a fast, NoSQL, OLTP layer — and the latter as a scalable, multimodel OLAP layer. This augmentation architecture is very common for SingleStore developers; you will also find some consolidate their DynamoDB workload directly into SingleStore.
Let’s state the facts — there are no analytical databases out there that have the native integration to DynamoDB that Rockset has. At SingleStore, we aim to deliver the most simple alternative to you at the lowest cost.
In this walkthrough, we will specifically be discussing how to integrate DynamoDB with SingleStore via Apache Kafka using SingleStore Pipelines. The blog provides a step-by-step walkthrough — from initial setup to final data validation — emphasizing why SingleStore offers a compelling alternative for Rockset customers.
If this integration option is not quite what you need, feel free to scroll down where we detail alternatives.
In this architecture change, data moves efficiently from DynamoDB to SingleStore, leveraging Kafka for seamless, scalable streaming and SingleStore's real-time pipelines. It ensures minimal downtime, offering an ideal solution for customers migrating from Rockset to SingleStore.
Streaming data from DynamoDB to Kafka
Since you already have data in DynamoDB, the first step is streaming this data to Kafka. This allows us to seamlessly transfer data to SingleStore.
Kafka setup and topic creation
1
bin/kafka-topics.sh --create --topic customer_topic --bootstrap-server2
localhost:9092 --partitions 83
bin/kafka-topics.sh --create --topic lineitem_topic --bootstrap-server4
localhost:9092 --partitions 85
bin/kafka-topics.sh --create --topic orders_topic --bootstrap-server6
localhost:9092 --partitions 87
8
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Python script to stream data to Kafka
The following is sample code to use to stream data from Kafka. This code can be housed in SingleStore Notebooks, or scheduled through SingleStore Job Service.
1
import boto32
import json3
from kafka import KafkaProducer4
from decimal import Decimal5
6
# Initialize DynamoDB resource7
dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')8
9
# Custom JSON encoder to handle Decimal types10
class DecimalEncoder(json.JSONEncoder):11
def default(self, obj):12
if isinstance(obj, Decimal):13
return float(obj)14
return super(DecimalEncoder, self).default(obj)15
16
# Initialize Kafka producer with custom JSON encoder17
producer = KafkaProducer(18
bootstrap_servers='<your_kafak_machine_ip>:9092',19
value_serializer=lambda v: json.dumps(v,20
cls=DecimalEncoder).encode('utf-8')21
)22
23
# Function to get the record count from a DynamoDB table24
def get_table_count(table_name):25
table = dynamodb.Table(table_name)26
response = table.scan(Select='COUNT')27
return response['Count']28
29
# Function to read data from DynamoDB and stream to Kafka30
def stream_table_to_kafka(table_name, topic_name):31
table = dynamodb.Table(table_name)32
record_count = 0 # Count of records streamed to Kafka33
34
try:35
response = table.scan()36
items = response['Items']37
38
for item in items:39
producer.send(topic_name, item)40
record_count += 141
if record_count % 1000 == 0:42
print(f'{record_count} records streamed from {table_name} to43
{topic_name}')44
45
# Continue scanning if there are more items46
while 'LastEvaluatedKey' in response:47
response =48
table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])49
items = response['Items']50
for item in items:51
producer.send(topic_name, item)52
record_count += 153
if record_count % 1000 == 0:54
print(f'{record_count} records streamed from {table_name}55
to {topic_name}')56
57
print(f'Total records streamed from {table_name} to {topic_name}:58
{record_count}')59
60
# Validate the record count61
expected_count = get_table_count(table_name)62
if record_count == expected_count:63
print(f'Successfully streamed all records from {table_name} to64
{topic_name}.')65
else:66
print(f'Warning: Mismatch in record count for {table_name}.67
Expected: {expected_count}, Streamed: {record_count}')68
69
except Exception as e:70
print(f'Error streaming data from {table_name} to {topic_name}: {e}')71
finally:72
producer.flush() # Ensure all messages are sent before exiting73
74
if __name__ == '__main__':75
# Define table names and corresponding Kafka topics76
tables_and_topics = {77
'customer': 'customer_topic',78
'lineitem': 'lineitem_topic',79
'orders': 'orders_topic'80
}81
82
try:83
# Stream data from each table to the corresponding Kafka topic one at a84
time85
for table_name, topic_name in tables_and_topics.items():86
print(f'Starting to stream data from {table_name} to {topic_name}')87
stream_table_to_kafka(table_name, topic_name)88
print(f'Completed streaming data from {table_name} to89
{topic_name}')90
except Exception as e:91
print(f'Error in streaming process: {e}')92
finally:93
producer.close()94
print('Data streaming complete.')95
Ingesting data from Kafka to SingleStore
Finally, we'll create pipelines in SingleStore to ingest data from Kafka topics into SingleStore tables. This step showcases the power and simplicity of SingleStore's real-time data pipelines.
SingleStore table DDLs and pipeline creation
1
CREATE DATABASE tpch PARTITIONS 8;2
3
-- pipeline and table ddls:4
-- Drop tables if they exist5
DROP TABLE IF EXISTS orders;6
DROP TABLE IF EXISTS lineitem;7
DROP TABLE IF EXISTS customer;8
9
CREATE TABLE orders (10
o_orderkey text NOT NULL,11
o_custkey text NOT NULL,12
o_orderstatus text NOT NULL,13
o_totalprice text NOT NULL,14
o_orderdate text NOT NULL,15
o_orderpriority text NOT NULL,16
o_clerk text NOT NULL,17
o_shippriority text NOT NULL,18
o_comment text NOT NULL,19
UNIQUE KEY pk (o_orderkey) UNENFORCED RELY,20
SHARD KEY __SHARDKEY (o_orderkey),21
SORT KEY o_orderkey (o_orderkey)22
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL23
AUTOSTATS_HISTOGRAM_MODE=CREATE24
AUTOSTATS_SAMPLING=ON25
SQL_MODE='STRICT_ALL_TABLES';26
27
CREATE TABLE lineitem (28
l_orderkey text NOT NULL,29
l_partkey text NOT NULL,30
l_suppkey text NOT NULL,31
l_linenumber text NOT NULL,32
l_quantity text NOT NULL,33
l_extendedprice text NOT NULL,34
l_discount text NOT NULL,35
l_tax text NOT NULL,36
l_returnflag text NOT NULL,37
l_linestatus text NOT NULL,38
l_shipdate text NOT NULL,39
l_commitdate text NOT NULL,40
l_receiptdate text NOT NULL,41
l_shipinstruct text NOT NULL,42
l_shipmode text NOT NULL,43
l_comment text NOT NULL,44
UNIQUE KEY pk (l_orderkey, l_linenumber) UNENFORCED RELY,45
SHARD KEY __SHARDKEY (l_orderkey),46
SORT KEY l_orderkey (l_orderkey)47
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL48
AUTOSTATS_HISTOGRAM_MODE=CREATE49
AUTOSTATS_SAMPLING=ON50
SQL_MODE='STRICT_ALL_TABLES';51
52
CREATE TABLE customer (53
c_custkey text NOT NULL,54
c_name text NOT NULL,55
c_address text NOT NULL,56
c_nationkey text NOT NULL,57
c_phone text NOT NULL,58
c_acctbal text NOT NULL,59
c_mktsegment text NOT NULL,60
c_comment text NOT NULL,61
UNIQUE KEY pk (c_custkey) UNENFORCED RELY,62
SHARD KEY __SHARDKEY (c_custkey),63
SORT KEY c_custkey (c_custkey)64
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL65
AUTOSTATS_HISTOGRAM_MODE=CREATE66
AUTOSTATS_SAMPLING=ON67
SQL_MODE='STRICT_ALL_TABLES';68
69
-- Create pipeline for orders table70
CREATE OR REPLACE PIPELINE orders_pipeline AS71
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/orders_topic'72
INTO TABLE orders73
(74
o_orderstatus <- o_orderstatus, o_clerk <- o_clerk,75
o_orderdate <- o_orderdate,o_shippriority <- o_shippriority,76
o_custkey <- o_custkey,o_totalprice <- o_totalprice,77
o_orderkey <- o_orderkey,o_comment <- o_comment,78
o_orderpriority <- o_orderpriority79
)80
FORMAT JSON;81
82
83
-- Create pipeline for lineitem table84
CREATE OR REPLACE PIPELINE lineitem_pipeline AS85
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/lineitem_topic'86
INTO TABLE lineitem87
(88
l_orderkey <- l_orderkey,89
l_partkey <- l_partkey,l_suppkey <- l_suppkey,90
l_linenumber <- l_linenumber,l_quantity <- l_quantity,91
l_extendedprice <- l_extendedprice,92
l_discount <- l_discount,l_tax <- l_tax,93
l_returnflag <- l_returnflag,l_linestatus <- l_linestatus,94
l_shipdate <- l_shipdate,95
l_commitdate <- l_commitdate,l_receiptdate <- l_receiptdate,96
l_shipinstruct <- l_shipinstruct,l_shipmode <- l_shipmode,97
l_comment <- l_comment98
)99
FORMAT JSON;100
101
-- Create pipeline for customer table102
CREATE OR REPLACE PIPELINE customer_pipeline AS103
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/customer_topic'104
INTO TABLE customer105
(106
c_custkey <- c_custkey,107
c_name <- c_name,108
c_address <- c_address,109
c_nationkey <- c_nationkey,110
c_phone <- c_phone,111
c_acctbal <- c_acctbal,112
c_mktsegment <- c_mktsegment,113
c_comment <- c_comment114
)115
FORMAT JSON;116
117
start pipeline orders_pipeline;118
start pipeline customer_pipeline;119
start pipeline lineitem_pipeline;
You can use the following query to check the timings for your pipeline.
1
select2
pipeline_name PipelineName3
,max(start_time) StartTime4
,format(sum(rows_streamed),0) RowsStreamed5
,format(sum(mb_streamed),2) MBStreamed6
,format(sum(batch_time),0) BatchTimeSec7
,format(sum(batch_time) / 60, 2) BatchTimeMin8
,format(sum(rows_streamed) / sum(batch_time),0) RowsperSec9
,format(sum(mb_streamed) / sum(batch_time),2) MBperSec10
from11
information_schema.pipelines_batches_summary12
where13
database_name = 'ai' -- replace with your database name14
and pipeline_name = 'orders_pipeline' -- replace with your pipeline name15
group by16
pipeline_name17
order by18
sum(rows_streamed) / sum(batch_time) desc;
Batch load Rockset data
Now that your real-time streams are set up, you will want to bulk load the rest of your data from Rockset. Since you’re already familiar with SingleStore pipelines by now, this part is super easy!
To move your data directly from Rockset to SingleStore using SingleStore Pipelines, follow these simple steps:
Step 1
Select the data from Rockset into S3 (or other object store). The following example shows the export of data into S3 in PARQUET format. You can also specify JSON format.
1
INSERT INTO 's3://analyticsdata/query1'2
INTEGRATION = 's3export'3
FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)4
SELECT * FROM commons.analytics
Step 2
Once the data is exported to S3 using the preceding command, you can set up the SingleStore pipeline directly from S3. Check out our documentation for a more detailed reference.
1
CREATE PIPELINE <Pipeline_name> AS2
LOAD DATA S3 's3://<bucket_name>/<filename>.json'3
CONFIG '{"region":"us-west-2"}'4
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",5
"aws_secret_access_key": "XXXXXXXXXXX"}'6
INTO TABLE keypairs7
FORMAT PARQUET8
(`key` <- keypairs::`key`,9
`value` <- keypairs::`value`);10
11
12
START PIPELINE Pipeline_name;
Once the pipeline loading is complete, we are done with the snapshot load stage of the migration.
Alternatives
We have more ways to replicate DynamoDB data to SingleStore:
- DynamoDB to SingleStore via AWS DMS. We have a few customers (like Particl) doing this in production today.
- DynamoDB to SingleStore via AWS SQS and Lambda functions. Lambda functions can transform messages in batches and generate SQL inserts, or update queries that can be run directly against the database.
Conclusion
Migrating from Rockset to SingleStore is a seamless process, thanks to SingleStore's robust and efficient real-time data pipelines. By following the steps outlined in this blog, you can easily integrate your existing DynamoDB system, ensuring minimal downtime and maximum efficiency.
SingleStore not only simplifies the migration process but also offers superior performance and real-time analytics database capabilities, making it an ideal choice for Rockset customers looking to enhance their data management solutions.
Remember, with SingleStore, you're not just migrating your data — you're upgrading your entire data infrastructure to a more powerful and scalable solution.