Welcome, DynamoDB x Rockset developers! This technical blog will show you how to integrate your DynamoDB database with SingleStore.
The combination of DynamoDB and SingleStore is a powerful architecture that leverages the former as a fast, NoSQL, OLTP layer — and the latter as a scalable, multimodel OLAP layer. This augmentation architecture is very common for SingleStore developers; you will also find some consolidate their DynamoDB workload directly into SingleStore.
Let’s state the facts — there are no analytical databases out there that have the native integration to DynamoDB that Rockset has. At SingleStore, we aim to deliver the most simple alternative to you at the lowest cost.
In this walkthrough, we will specifically be discussing how to integrate DynamoDB with SingleStore via Apache Kafka using SingleStore Pipelines. The blog provides a step-by-step walkthrough — from initial setup to final data validation — emphasizing why SingleStore offers a compelling alternative for Rockset customers.
If this integration option is not quite what you need, feel free to scroll down where we detail alternatives.
In this architecture change, data moves efficiently from DynamoDB to SingleStore, leveraging Kafka for seamless, scalable streaming and SingleStore's real-time pipelines. It ensures minimal downtime, offering an ideal solution for customers migrating from Rockset to SingleStore.
Streaming data from DynamoDB to Kafka
Since you already have data in DynamoDB, the first step is streaming this data to Kafka. This allows us to seamlessly transfer data to SingleStore.
Kafka setup and topic creation
bin/kafka-topics.sh --create --topic customer_topic --bootstrap-serverlocalhost:9092 --partitions 8bin/kafka-topics.sh --create --topic lineitem_topic --bootstrap-serverlocalhost:9092 --partitions 8bin/kafka-topics.sh --create --topic orders_topic --bootstrap-serverlocalhost:9092 --partitions 8bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Python script to stream data to Kafka
The following is sample code to use to stream data from Kafka. This code can be housed in SingleStore Notebooks, or scheduled through SingleStore Job Service.
import boto3import jsonfrom kafka import KafkaProducerfrom decimal import Decimal# Initialize DynamoDB resourcedynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')# Custom JSON encoder to handle Decimal typesclass DecimalEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, Decimal):return float(obj)return super(DecimalEncoder, self).default(obj)# Initialize Kafka producer with custom JSON encoderproducer = KafkaProducer(bootstrap_servers='<your_kafak_machine_ip>:9092',value_serializer=lambda v: json.dumps(v,cls=DecimalEncoder).encode('utf-8'))# Function to get the record count from a DynamoDB tabledef get_table_count(table_name):table = dynamodb.Table(table_name)response = table.scan(Select='COUNT')return response['Count']# Function to read data from DynamoDB and stream to Kafkadef stream_table_to_kafka(table_name, topic_name):table = dynamodb.Table(table_name)record_count = 0 # Count of records streamed to Kafkatry:response = table.scan()items = response['Items']for item in items:producer.send(topic_name, item)record_count += 1if record_count % 1000 == 0:print(f'{record_count} records streamed from {table_name} to{topic_name}')# Continue scanning if there are more itemswhile 'LastEvaluatedKey' in response:response =table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])items = response['Items']for item in items:producer.send(topic_name, item)record_count += 1if record_count % 1000 == 0:print(f'{record_count} records streamed from {table_name}to {topic_name}')print(f'Total records streamed from {table_name} to {topic_name}:{record_count}')# Validate the record countexpected_count = get_table_count(table_name)if record_count == expected_count:print(f'Successfully streamed all records from {table_name} to{topic_name}.')else:print(f'Warning: Mismatch in record count for {table_name}.Expected: {expected_count}, Streamed: {record_count}')except Exception as e:print(f'Error streaming data from {table_name} to {topic_name}: {e}')finally:producer.flush() # Ensure all messages are sent before exitingif __name__ == '__main__':# Define table names and corresponding Kafka topicstables_and_topics = {'customer': 'customer_topic','lineitem': 'lineitem_topic','orders': 'orders_topic'}try:# Stream data from each table to the corresponding Kafka topic one at atimefor table_name, topic_name in tables_and_topics.items():print(f'Starting to stream data from {table_name} to {topic_name}')stream_table_to_kafka(table_name, topic_name)print(f'Completed streaming data from {table_name} to{topic_name}')except Exception as e:print(f'Error in streaming process: {e}')finally:producer.close()print('Data streaming complete.')
Ingesting data from Kafka to SingleStore
Finally, we'll create pipelines in SingleStore to ingest data from Kafka topics into SingleStore tables. This step showcases the power and simplicity of SingleStore's real-time data pipelines.
SingleStore table DDLs and pipeline creation
CREATE DATABASE tpch PARTITIONS 8;-- pipeline and table ddls:-- Drop tables if they existDROP TABLE IF EXISTS orders;DROP TABLE IF EXISTS lineitem;DROP TABLE IF EXISTS customer;CREATE TABLE orders (o_orderkey text NOT NULL,o_custkey text NOT NULL,o_orderstatus text NOT NULL,o_totalprice text NOT NULL,o_orderdate text NOT NULL,o_orderpriority text NOT NULL,o_clerk text NOT NULL,o_shippriority text NOT NULL,o_comment text NOT NULL,UNIQUE KEY pk (o_orderkey) UNENFORCED RELY,SHARD KEY __SHARDKEY (o_orderkey),SORT KEY o_orderkey (o_orderkey)) AUTOSTATS_CARDINALITY_MODE=INCREMENTALAUTOSTATS_HISTOGRAM_MODE=CREATEAUTOSTATS_SAMPLING=ONSQL_MODE='STRICT_ALL_TABLES';CREATE TABLE lineitem (l_orderkey text NOT NULL,l_partkey text NOT NULL,l_suppkey text NOT NULL,l_linenumber text NOT NULL,l_quantity text NOT NULL,l_extendedprice text NOT NULL,l_discount text NOT NULL,l_tax text NOT NULL,l_returnflag text NOT NULL,l_linestatus text NOT NULL,l_shipdate text NOT NULL,l_commitdate text NOT NULL,l_receiptdate text NOT NULL,l_shipinstruct text NOT NULL,l_shipmode text NOT NULL,l_comment text NOT NULL,UNIQUE KEY pk (l_orderkey, l_linenumber) UNENFORCED RELY,SHARD KEY __SHARDKEY (l_orderkey),SORT KEY l_orderkey (l_orderkey)) AUTOSTATS_CARDINALITY_MODE=INCREMENTALAUTOSTATS_HISTOGRAM_MODE=CREATEAUTOSTATS_SAMPLING=ONSQL_MODE='STRICT_ALL_TABLES';CREATE TABLE customer (c_custkey text NOT NULL,c_name text NOT NULL,c_address text NOT NULL,c_nationkey text NOT NULL,c_phone text NOT NULL,c_acctbal text NOT NULL,c_mktsegment text NOT NULL,c_comment text NOT NULL,UNIQUE KEY pk (c_custkey) UNENFORCED RELY,SHARD KEY __SHARDKEY (c_custkey),SORT KEY c_custkey (c_custkey)) AUTOSTATS_CARDINALITY_MODE=INCREMENTALAUTOSTATS_HISTOGRAM_MODE=CREATEAUTOSTATS_SAMPLING=ONSQL_MODE='STRICT_ALL_TABLES';-- Create pipeline for orders tableCREATE OR REPLACE PIPELINE orders_pipeline ASLOAD DATA KAFKA '<your_kafak_machine_ip>:9092/orders_topic'INTO TABLE orders(o_orderstatus <- o_orderstatus, o_clerk <- o_clerk,o_orderdate <- o_orderdate,o_shippriority <- o_shippriority,o_custkey <- o_custkey,o_totalprice <- o_totalprice,o_orderkey <- o_orderkey,o_comment <- o_comment,o_orderpriority <- o_orderpriority)FORMAT JSON;-- Create pipeline for lineitem tableCREATE OR REPLACE PIPELINE lineitem_pipeline ASLOAD DATA KAFKA '<your_kafak_machine_ip>:9092/lineitem_topic'INTO TABLE lineitem(l_orderkey <- l_orderkey,l_partkey <- l_partkey,l_suppkey <- l_suppkey,l_linenumber <- l_linenumber,l_quantity <- l_quantity,l_extendedprice <- l_extendedprice,l_discount <- l_discount,l_tax <- l_tax,l_returnflag <- l_returnflag,l_linestatus <- l_linestatus,l_shipdate <- l_shipdate,l_commitdate <- l_commitdate,l_receiptdate <- l_receiptdate,l_shipinstruct <- l_shipinstruct,l_shipmode <- l_shipmode,l_comment <- l_comment)FORMAT JSON;-- Create pipeline for customer tableCREATE OR REPLACE PIPELINE customer_pipeline ASLOAD DATA KAFKA '<your_kafak_machine_ip>:9092/customer_topic'INTO TABLE customer(c_custkey <- c_custkey,c_name <- c_name,c_address <- c_address,c_nationkey <- c_nationkey,c_phone <- c_phone,c_acctbal <- c_acctbal,c_mktsegment <- c_mktsegment,c_comment <- c_comment)FORMAT JSON;start pipeline orders_pipeline;start pipeline customer_pipeline;start pipeline lineitem_pipeline;
You can use the following query to check the timings for your pipeline.
selectpipeline_name PipelineName,max(start_time) StartTime,format(sum(rows_streamed),0) RowsStreamed,format(sum(mb_streamed),2) MBStreamed,format(sum(batch_time),0) BatchTimeSec,format(sum(batch_time) / 60, 2) BatchTimeMin,format(sum(rows_streamed) / sum(batch_time),0) RowsperSec,format(sum(mb_streamed) / sum(batch_time),2) MBperSecfrominformation_schema.pipelines_batches_summarywheredatabase_name = 'ai' -- replace with your database nameand pipeline_name = 'orders_pipeline' -- replace with your pipeline namegroup bypipeline_nameorder bysum(rows_streamed) / sum(batch_time) desc;
Batch load Rockset data
Now that your real-time streams are set up, you will want to bulk load the rest of your data from Rockset. Since you’re already familiar with SingleStore pipelines by now, this part is super easy!
To move your data directly from Rockset to SingleStore using SingleStore Pipelines, follow these simple steps:
Step 1
Select the data from Rockset into S3 (or other object store). The following example shows the export of data into S3 in PARQUET format. You can also specify JSON format.
INSERT INTO 's3://analyticsdata/query1'INTEGRATION = 's3export'FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)SELECT * FROM commons.analytics
Step 2
Once the data is exported to S3 using the preceding command, you can set up the SingleStore pipeline directly from S3. Check out our documentation for a more detailed reference.
CREATE PIPELINE <Pipeline_name> ASLOAD DATA S3 's3://<bucket_name>/<filename>.json'CONFIG '{"region":"us-west-2"}'CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX","aws_secret_access_key": "XXXXXXXXXXX"}'INTO TABLE keypairsFORMAT PARQUET(`key` <- keypairs::`key`,`value` <- keypairs::`value`);START PIPELINE Pipeline_name;
Once the pipeline loading is complete, we are done with the snapshot load stage of the migration.
Alternatives
We have more ways to replicate DynamoDB data to SingleStore:
- DynamoDB to SingleStore via AWS DMS. We have a few customers (like Particl) doing this in production today.
- DynamoDB to SingleStore via AWS SQS and Lambda functions. Lambda functions can transform messages in batches and generate SQL inserts, or update queries that can be run directly against the database.
Conclusion
Migrating from Rockset to SingleStore is a seamless process, thanks to SingleStore's robust and efficient real-time data pipelines. By following the steps outlined in this blog, you can easily integrate your existing DynamoDB system, ensuring minimal downtime and maximum efficiency.
SingleStore not only simplifies the migration process but also offers superior performance and real-time analytics database capabilities, making it an ideal choice for Rockset customers looking to enhance their data management solutions.
Remember, with SingleStore, you're not just migrating your data — you're upgrading your entire data infrastructure to a more powerful and scalable solution.