Welcome, DynamoDB x Rockset developers! This blog will show you how to easily set up CDC from your AWS DynamoDB database to SingleStore using DynamoDB streams and a Lambda function.
Step 1. Enable DynamoDB streams
Here are the steps to enable DynamoDB streams on a given table:
- Open the DynamoDB console
- Choose tables
- Choose the table you want to set up the stream for
- Under “Exports and streams,” choose DynamoDB stream details
- Choose turn on
- For view type, you’ll be prompted with a range of options. Choose the one that meets your requirements
- Select “Turn on stream”
Once complete, your DynamoDB streams box should look like this.
Step 2. Set up the SingleStore database
Now that we have everything set up on the DynamoDB side, let’s set up our SingleStore Helios® environment. For this blog, we’ll assume you’ve already created a workspace group to determine your AWS region, created a workspace in SingleStore Helios and granted IP access between the workspace group and your AWS account.
CREATE DATABASE demo;USE demo;DROP TABLE IF EXISTS transactions;CREATE TABLE transactions (json_col JSON);
Step 3. Create a Lambda function to consume the stream and write to SingleStore
Now that we have the DynamoDB stream set up, we need to create a Lambda function to be invoked and executed each time an event is pushed into the DynamoDB stream. To create the Lambda function, navigate to the Lambda console and select ‘Create function.’ From there you will be taken to a window like in the screenshot here.
In our example, we’re using the ‘author from scratch’ creation method and have used default options (except for selecting Python 3.12 as our programming language).
In the newly created Lambda function console, scroll down to the code section. Here, you can write the code to be executed each time new data from the DynamoDB stream invokes the Lambda function.
We’re going to keep our example pretty simple and load the JSON payload directly to a JSON column in SingleStore — so there is no need for data transformations. We can define how the lambda function pushes from Dynamo with the following python code:
import jsonimport pymysql# SingleStore connection detailsSINGLESTORE_HOST = 'singlestore_host_connection_string'SINGLESTORE_USER = 'user_name'SINGLESTORE_PASSWORD = 'user_password'SINGLESTORE_DB = 'db_name'def connect_singlestore():return pymysql.connect(host=SINGLESTORE_HOST,user=SINGLESTORE_USER,password=SINGLESTORE_PASSWORD,db=SINGLESTORE_DB,db=SINGLESTORE_DB,cursorclass=pymysql.cursors.DictCursor)def lambda_handler(event, context):for record in event['Records']:print(record)if record['eventSource'] == 'aws:dynamodb':print(event)connection = connect_singlestore()print("Connected to SingleStore")event_name = record['eventName']if event_name == 'INSERT':try:with connection.cursor() as cursor:json_str = json.dumps(record['dynamodb']['NewImage'])sql = """INSERT INTO transactions (json_col) VALUES('{}')""".format(json_str)cursor.execute(sql)connection.commit()except Exception as e:print(f"Error: {str(e)}")finally:connection.close()print("Connection closed")elif event_name == 'REMOVE':delete_key = record['dynamodb']['Keys']['id']['N']print(delete_key)try:with connection.cursor() as cursor:sql = """DELETE FROM transactions WHERE json_col::id::N= {};""".format(delete_key)cursor.execute(sql)connection.commit()except Exception as e:print(f"Error: {str(e)}")finally:connection.close()print("Connection closed")elif event_name == 'MODIFY':update_key = record['dynamodb']['Keys']['id']['N']print(update_key)print(record)update_value =record['dynamodb']['NewImage']['total_cost']['N']print(update_value)try:with connection.cursor() as cursor:sql = """UPDATE transactions SETjson_col::total_cost::N = '"{}"' WHERE json_col::id::N ='"{}"';""".format(update_value, update_key)cursor.execute(sql)connection.commit()except Exception as e:print(f"Error: {str(e)}")finally:connection.close()print("Connection closed")else:continueelse:continuereturn {'statusCode': 200,'body': json.dumps('Data processed successfully')}
This Lambda function handles new inserts, updates and deletes from the DynamoDB table and pushes the changes into our SingleStore table. We’ve stuck with basic update logic for our demo purposes, but you can add any logic in your Lambda function to handle application-specific scenarios.
Note, if you need to import any libraries which are not pre-installed in the Lambda environment, you’ll need to create a Lambda Layer to manage dependencies with the function.
Step 4. Connect your DynamoDB stream to the Lambda function
The final step on the DynamoDB side is to create a trigger connecting your DynamoDB stream to the newly created Lambda function. To do this, navigate back to the table where you set the stream up and at the bottom of the ‘Exports and Streams’ tab, there will be a ‘triggers’ box. Select ‘Create trigger’ and fill in the next page.
Now our Lambda will be invoked by CDC events in our Dynamo stream.
Step 5. Test out the connection from DynamoDB to SingleStore
Now we get to the fun part! Let’s test out what we’ve built with some data. To do this, we’re going to run three separate Python scripts, one for each operation (insert, update and delete).
Insert data
First, let’s test inserting data into DynamoDB to see if it flows through to SingleStore. The following script generates synthetic transaction data and writes it to DynamoDB — we are generating 10 synthetic transactions and inserting them into DynamoDB.
import boto3import jsonimport timefrom random import randint, uniformfrom decimal import Decimal# Initialize DynamoDB clientdynamodb = boto3.resource('dynamodb', region_name='us-east-1')table_name = 'transactions'table = dynamodb.Table(table_name)def generate_transaction(i):transaction_id = itotal_cost = Decimal(str(round(uniform(10.0, 100.0), 2)))items_purchased = [randint(1, 10) for _ in range(randint(1, 5))]transaction_at = int(time.time())return {'id': transaction_id,'total_cost': total_cost,'transaction_at': transaction_at}def write_to_dynamodb(transaction):response = table.put_item(Item={'id': transaction['id'],'total_cost': transaction['total_cost'],'transaction_at': transaction['transaction_at']})print(f"Successfully inserted transaction with id: {transaction['id']}")return response# Generate and write synthetic transactionsfor i in range(10):transaction_data = generate_transaction(i)print(transaction_data)write_to_dynamodb(transaction_data)
After running the script, confirm all the transactions have been replicated to SingleStore by doing a count on the transactions table.
SELECT COUNT(*) FROM TRANSACTIONS;
Great! So we’ve got inserts working. Let’s inspect what the data looks like before moving onto updates.
SELECT * FROM transactions
The data is being stored in JSON blobs. We can parse out individual key-values by using ::
operator, accessing them in our desired SQL format.
Note, If you wanted to pull out specific key value pairs and have them stored in their own column when writing to SingleStore, a great way to do this is via SingleStore persisted computed columns. Alternatively, you could parse them out in the Lambda function and insert directly into individual columns.
Update
To test if updates to DynamoDB are flowing through, let’s update the total_cost value to 0 where the id value is 1. The following script makes this change to DynamoDB.
import boto3import jsonimport timefrom random import randint, uniformfrom decimal import Decimalfrom botocore.exceptions import ClientError# Initialize DynamoDB clientdynamodb = boto3.resource('dynamodb', region_name='us-east-1')table_name = 'transactions'table = dynamodb.Table(table_name)# Specify the tabletable = dynamodb.Table('transactions')def update_transaction(transaction_id, total_cost=None, items_purchased=None,transaction_at=None):# Build the update expression and attribute values dynamicallyupdate_expression = "SET"expression_attribute_values = {}expression_attribute_names = {}if total_cost is not None:update_expression += " #tc = :total_cost,"expression_attribute_values[":total_cost"] = total_costexpression_attribute_names["#tc"] = "total_cost"# Remove trailing commaupdate_expression = update_expression.rstrip(',')try:response = table.update_item(Key={'id': transaction_id},UpdateExpression=update_expression,ExpressionAttributeValues=expression_attribute_values,ExpressionAttributeNames=expression_attribute_names,ReturnValues="UPDATED_NEW")print("UpdateItem succeeded:")print(response)except ClientError as e:print(e.response['Error']['Message'])# Example usageif __name__ == '__main__':transaction_id = 1new_total_cost = Decimal(0)update_transaction(transaction_id, new_total_cost)
After successfully running the script, let’s check if the update flowed through to SingleStore.
Great, now we’ve inserts and updates working!
Delete
Finally, let’s delete all the records to clean up our tables. The following script loops through all the records in DynamoDB and deletes them.
import boto3import jsonimport timefrom random import randint, uniformfrom decimal import Decimal# Initialize DynamoDB clientdynamodb = boto3.resource('dynamodb', region_name='us-east-1')table_name = 'transactions'table = dynamodb.Table(table_name)def delete_all_items():# Scan the table to get all itemsscan = table.scan()with table.batch_writer() as batch:for each in scan['Items']:batch.delete_item(Key={'id': each['id'] # Replace with your table's primary key})print(f"Deleted {len(scan['Items'])} items from {table_name}")# Main executionif __name__ == '__main__':delete_all_items()
After running the script, confirm the records were deleted in SingleStore:
We’ve now set up inserts, updates and deletes and are one step closer to migrating our application completely.
Conclusion
Leveraging DynamoDB streams and a Lambda function allows you to quickly set up CDC from DynamoDB into SingleStore in a manner that might feel familiar to Rockset developers. This will likely allow for a quicker migration before potentially exploring SingleStore pipelines from Kafka that can infer schema.
However you choose to ingest data from DynamoDB to find your Rockset alternative, SingleStore allows you to deliver real-time analytics and contextualization behind your application that can handle complex parameterized queries. SingleStore is purpose-built to handle real-time transactions, analytics and search — and we’ve done this repeatedly at enterprise scale.
Ready to start your Rockset migration? Chat with a SingleStore Field Engineer today.