Integrating DynamoDB + SingleStore

Clock Icon

3 min read

Pencil Icon

Jun 27, 2024

Welcome, DynamoDB x Rockset developers! This technical blog will show you how to integrate your DynamoDB database with SingleStore.

Integrating DynamoDB + SingleStore

The combination of DynamoDB and SingleStore is a powerful architecture that leverages the former as a fast, NoSQL, OLTP layer — and the latter as a scalable, multimodel OLAP layer. This augmentation architecture is very common for SingleStore developers; you will also find some consolidate their DynamoDB workload directly into SingleStore. 

Let’s state the facts — there are no analytical databases out there that have the native integration to DynamoDB that Rockset has. At SingleStore, we aim to deliver the most simple alternative to you at the lowest cost.

In this walkthrough, we will specifically be discussing how to integrate DynamoDB with SingleStore via Apache Kafka using SingleStore Pipelines. The blog provides a step-by-step walkthrough — from initial setup to final data validation — emphasizing why SingleStore offers a compelling alternative for Rockset customers.

If this integration option is not quite what you need, feel free to scroll down where we detail alternatives.

In this architecture change, data moves efficiently from DynamoDB to SingleStore, leveraging Kafka for seamless, scalable streaming and SingleStore's real-time pipelines. It ensures minimal downtime, offering an ideal solution for customers migrating from Rockset to SingleStore. 

streaming-data-from-dynamo-db-to-kafkaStreaming data from DynamoDB to Kafka

Since you already have data in DynamoDB, the first step is streaming this data to Kafka. This allows us to seamlessly transfer data to SingleStore.

Kafka setup and topic creation

bin/kafka-topics.sh --create --topic customer_topic --bootstrap-server
localhost:9092 --partitions 8
bin/kafka-topics.sh --create --topic lineitem_topic --bootstrap-server
localhost:9092 --partitions 8
bin/kafka-topics.sh --create --topic orders_topic --bootstrap-server
localhost:9092 --partitions 8
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Python script to stream data to Kafka
The following is sample code to use to stream data from Kafka. This code can be housed in SingleStore Notebooks, or scheduled through SingleStore Job Service.

import boto3
import json
from kafka import KafkaProducer
from decimal import Decimal
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')
# Custom JSON encoder to handle Decimal types
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super(DecimalEncoder, self).default(obj)
# Initialize Kafka producer with custom JSON encoder
producer = KafkaProducer(
bootstrap_servers='<your_kafak_machine_ip>:9092',
value_serializer=lambda v: json.dumps(v,
cls=DecimalEncoder).encode('utf-8')
)
# Function to get the record count from a DynamoDB table
def get_table_count(table_name):
table = dynamodb.Table(table_name)
response = table.scan(Select='COUNT')
return response['Count']
# Function to read data from DynamoDB and stream to Kafka
def stream_table_to_kafka(table_name, topic_name):
table = dynamodb.Table(table_name)
record_count = 0 # Count of records streamed to Kafka
try:
response = table.scan()
items = response['Items']
for item in items:
producer.send(topic_name, item)
record_count += 1
if record_count % 1000 == 0:
print(f'{record_count} records streamed from {table_name} to
{topic_name}')
# Continue scanning if there are more items
while 'LastEvaluatedKey' in response:
response =
table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items = response['Items']
for item in items:
producer.send(topic_name, item)
record_count += 1
if record_count % 1000 == 0:
print(f'{record_count} records streamed from {table_name}
to {topic_name}')
print(f'Total records streamed from {table_name} to {topic_name}:
{record_count}')
# Validate the record count
expected_count = get_table_count(table_name)
if record_count == expected_count:
print(f'Successfully streamed all records from {table_name} to
{topic_name}.')
else:
print(f'Warning: Mismatch in record count for {table_name}.
Expected: {expected_count}, Streamed: {record_count}')
except Exception as e:
print(f'Error streaming data from {table_name} to {topic_name}: {e}')
finally:
producer.flush() # Ensure all messages are sent before exiting
if __name__ == '__main__':
# Define table names and corresponding Kafka topics
tables_and_topics = {
'customer': 'customer_topic',
'lineitem': 'lineitem_topic',
'orders': 'orders_topic'
}
try:
# Stream data from each table to the corresponding Kafka topic one at a
time
for table_name, topic_name in tables_and_topics.items():
print(f'Starting to stream data from {table_name} to {topic_name}')
stream_table_to_kafka(table_name, topic_name)
print(f'Completed streaming data from {table_name} to
{topic_name}')
except Exception as e:
print(f'Error in streaming process: {e}')
finally:
producer.close()
print('Data streaming complete.')

ingesting-data-from-kafka-to-single-storeIngesting data from Kafka to SingleStore

Finally, we'll create pipelines in SingleStore to ingest data from Kafka topics into SingleStore tables. This step showcases the power and simplicity of SingleStore's real-time data pipelines.

SingleStore table DDLs and pipeline creation

CREATE DATABASE tpch PARTITIONS 8;
-- pipeline and table ddls:
-- Drop tables if they exist
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS lineitem;
DROP TABLE IF EXISTS customer;
CREATE TABLE orders (
o_orderkey text NOT NULL,
o_custkey text NOT NULL,
o_orderstatus text NOT NULL,
o_totalprice text NOT NULL,
o_orderdate text NOT NULL,
o_orderpriority text NOT NULL,
o_clerk text NOT NULL,
o_shippriority text NOT NULL,
o_comment text NOT NULL,
UNIQUE KEY pk (o_orderkey) UNENFORCED RELY,
SHARD KEY __SHARDKEY (o_orderkey),
SORT KEY o_orderkey (o_orderkey)
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES';
CREATE TABLE lineitem (
l_orderkey text NOT NULL,
l_partkey text NOT NULL,
l_suppkey text NOT NULL,
l_linenumber text NOT NULL,
l_quantity text NOT NULL,
l_extendedprice text NOT NULL,
l_discount text NOT NULL,
l_tax text NOT NULL,
l_returnflag text NOT NULL,
l_linestatus text NOT NULL,
l_shipdate text NOT NULL,
l_commitdate text NOT NULL,
l_receiptdate text NOT NULL,
l_shipinstruct text NOT NULL,
l_shipmode text NOT NULL,
l_comment text NOT NULL,
UNIQUE KEY pk (l_orderkey, l_linenumber) UNENFORCED RELY,
SHARD KEY __SHARDKEY (l_orderkey),
SORT KEY l_orderkey (l_orderkey)
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES';
CREATE TABLE customer (
c_custkey text NOT NULL,
c_name text NOT NULL,
c_address text NOT NULL,
c_nationkey text NOT NULL,
c_phone text NOT NULL,
c_acctbal text NOT NULL,
c_mktsegment text NOT NULL,
c_comment text NOT NULL,
UNIQUE KEY pk (c_custkey) UNENFORCED RELY,
SHARD KEY __SHARDKEY (c_custkey),
SORT KEY c_custkey (c_custkey)
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES';
-- Create pipeline for orders table
CREATE OR REPLACE PIPELINE orders_pipeline AS
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/orders_topic'
INTO TABLE orders
(
o_orderstatus <- o_orderstatus, o_clerk <- o_clerk,
o_orderdate <- o_orderdate,o_shippriority <- o_shippriority,
o_custkey <- o_custkey,o_totalprice <- o_totalprice,
o_orderkey <- o_orderkey,o_comment <- o_comment,
o_orderpriority <- o_orderpriority
)
FORMAT JSON;
-- Create pipeline for lineitem table
CREATE OR REPLACE PIPELINE lineitem_pipeline AS
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/lineitem_topic'
INTO TABLE lineitem
(
l_orderkey <- l_orderkey,
l_partkey <- l_partkey,l_suppkey <- l_suppkey,
l_linenumber <- l_linenumber,l_quantity <- l_quantity,
l_extendedprice <- l_extendedprice,
l_discount <- l_discount,l_tax <- l_tax,
l_returnflag <- l_returnflag,l_linestatus <- l_linestatus,
l_shipdate <- l_shipdate,
l_commitdate <- l_commitdate,l_receiptdate <- l_receiptdate,
l_shipinstruct <- l_shipinstruct,l_shipmode <- l_shipmode,
l_comment <- l_comment
)
FORMAT JSON;
-- Create pipeline for customer table
CREATE OR REPLACE PIPELINE customer_pipeline AS
LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/customer_topic'
INTO TABLE customer
(
c_custkey <- c_custkey,
c_name <- c_name,
c_address <- c_address,
c_nationkey <- c_nationkey,
c_phone <- c_phone,
c_acctbal <- c_acctbal,
c_mktsegment <- c_mktsegment,
c_comment <- c_comment
)
FORMAT JSON;
start pipeline orders_pipeline;
start pipeline customer_pipeline;
start pipeline lineitem_pipeline;

You can use the following query to check the timings for your pipeline.

select
pipeline_name PipelineName
,max(start_time) StartTime
,format(sum(rows_streamed),0) RowsStreamed
,format(sum(mb_streamed),2) MBStreamed
,format(sum(batch_time),0) BatchTimeSec
,format(sum(batch_time) / 60, 2) BatchTimeMin
,format(sum(rows_streamed) / sum(batch_time),0) RowsperSec
,format(sum(mb_streamed) / sum(batch_time),2) MBperSec
from
information_schema.pipelines_batches_summary
where
database_name = 'ai' -- replace with your database name
and pipeline_name = 'orders_pipeline' -- replace with your pipeline name
group by
pipeline_name
order by
sum(rows_streamed) / sum(batch_time) desc;

batch-load-rockset-dataBatch load Rockset data

Now that your real-time streams are set up, you will want to bulk load the rest of your data from Rockset. Since you’re already familiar with SingleStore pipelines by now, this part is super easy!

To move your data directly from Rockset to SingleStore using SingleStore Pipelines, follow these simple steps:

Step 1
Select the data from Rockset into S3 (or other object store). The following example shows the export of data into S3 in PARQUET format. You can also specify JSON format.

INSERT INTO 's3://analyticsdata/query1'
INTEGRATION = 's3export'
FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)
SELECT * FROM commons.analytics

Step 2
Once the data is exported to S3 using the preceding command, you can set up the SingleStore pipeline directly from S3.
Check out our documentation for a more detailed reference.

CREATE PIPELINE <Pipeline_name> AS
LOAD DATA S3 's3://<bucket_name>/<filename>.json'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXX"}'
INTO TABLE keypairs
FORMAT PARQUET
(`key` <- keypairs::`key`,
`value` <- keypairs::`value`);
START PIPELINE Pipeline_name;

Once the pipeline loading is complete, we are done with the snapshot load stage of the migration.

alternativesAlternatives

We have more ways to replicate DynamoDB data to SingleStore:

  1. DynamoDB to SingleStore via AWS DMS. We have a few customers (like Particl) doing this in production today.
  2. DynamoDB to SingleStore via AWS SQS and Lambda functions. Lambda functions can transform messages in batches and generate SQL inserts, or update queries that can be run directly against the database.

conclusionConclusion

Migrating from Rockset to SingleStore is a seamless process, thanks to SingleStore's robust and efficient real-time data pipelines. By following the steps outlined in this blog, you can easily integrate your existing DynamoDB system, ensuring minimal downtime and maximum efficiency.

SingleStore not only simplifies the migration process but also offers superior performance and real-time analytics database capabilities, making it an ideal choice for Rockset customers looking to enhance their data management solutions.

Remember, with SingleStore, you're not just migrating your data — you're upgrading your entire data infrastructure to a more powerful and scalable solution.


Share