Hi,
We’re looking into using Memsql as an Audit system, leverage CDC with debezium, stream changes into Kafka and convert json on the fly into a table. We’ve done a POC and we have the concept working.
Question: (a new column is added to a table in Mysql for example)
What happen to MemSQL pipeline when a schema change is done in MySQL, which Kafka will detect?
How can the change (new column) be implemented into MemSQL? Does the pipeline need to be dropped and recreated? If transform is used, python script needs to be updated I presume.
Depending on how you have mapped your Debezium JSON into the DB you could go through the transform and execute the alter table to add new columns by having a Python connection to the Master Agg and then push your JSON to a stored procedure to execute the insert, update or delete statement.
So flow is to only map the before JSON, After JSON, Source JSON and Operation in the pipeline, have a transform only interrogate for schema changes and perform alters and then have the SP perform the load.
This process does push loading data up to the aggregator so performance is slower than a straight pipeline load.
Here is a sample Stored Procedure that does the apply from Debzium.
CREATE OR REPLACE PROCEDURE handle_kafka_cdc(batch query(before
json, after
json, source json, op text)) as
DECLARE
v_table text;
v_key text;
v_row json;
v_before_string text;
delete_sql text;
insert_sql text;
tmp text;
arr ARRAY(record(cbefore
json, cafter
json, source json, op varchar(5)));
nvp_array array(text);
nvp text;
i int;
v_equal int;
v_len int;
insert_columns text;
insert_values text;
v_set_string text;
v_update_string text;
v_insert_string text;
v_op char(1);
v_json_select query( cbefore json, cafter json, source json, op varchar(5));
v_stmt_text text;
v_query_text query( t text);
v_current_db text;
v_error_msg text;
v_delete_string text;
v_select_database query(q_current_db varchar(64)) = select database();
v_select_delete_string query(q_delete_string text);
BEGIN
for TESTING ONLY
#v_json_select = select cdata::before
,cdata::after
,cdata::source,cdata::op from cdcdata;
#arr = collect(v_json_select);
arr = COLLECT(batch);
FOR x in arr LOOP
v_table = x.source:table
;
v_op = substring(x.op,2,2);
IF v_op = 'd' THEN
BEGIN
#Build delete string
v_delete_string = REPLACE(
REPLACE(
REPLACE(
REPLACE(x.cbefore,
'{"' ,' '),
'":','='),
',"',' and '),
'}',' ');
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
Begin
# Execute delete from table
v_delete_string = Replace(v_delete_string,'=null',' is null');
v_stmt_text = CONCAT('DELETE FROM ',v_table,' where ',v_delete_string);
execute immediate v_stmt_text;
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
elsif v_op = 'u' THEN
BEGIN
#Build NVP string of after object
v_set_string = REPLACE(
REPLACE(
REPLACE(
REPLACE(x.cafter,
'{"' ,' '),
'":','='),
',"',' , '),
'}',' ');
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
BEGIN
#Build NVP string of before object
v_update_string = REPLACE(
REPLACE(
REPLACE(
REPLACE(x.cbefore,
'{"' ,' '),
'":','='),
',"',' and '),
'}',' ');
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
BEGIN
v_update_string = Replace(v_update_string,'=null',' is null');
v_stmt_text = concat('update ',v_table,' set ',v_set_string,' where ', v_update_string);
execute immediate v_stmt_text;
EXCEPTION
WHEN OTHERS THEN
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: error building update columns string: ';
echo select v_error_msg as "error";
raise;
END;
ELSE
BEGIN
#Build NVP string of after object
v_insert_string = REPLACE(
REPLACE(
REPLACE(
REPLACE(x.cafter,
'{"' ,' '),
'":','='),
',"',' and '),
'}',' ');
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
BEGIN
#Build array of name value pairs
nvp_array = split(v_insert_string, "and");
EXCEPTION
when ER_SCALAR_BUILTIN_NO_ROWS or others then
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
echo select v_error_msg as "error";
raise;
END;
BEGIN
#build replace columns string and columns values
i=0;
insert_columns = ' ';
insert_values = ' ';
FOR y in nvp_array LOOP
v_key = nvp_array[i];
v_equal = instr(v_key,'=');
v_len = length(v_key);
insert_columns = Concat(insert_columns,substring(v_key,1,v_equal-1),',');
insert_values = Concat(insert_values,substring(v_key,v_equal+1,v_len),',');
i=i+1;
END LOOP;
v_len = length(insert_columns);
insert_columns = substring(insert_columns,1,v_len-1);
v_len = length(insert_values);
insert_values = substring(insert_values,1,v_len-1);
END;
BEGIN
v_stmt_text = concat('INSERT INTO ',v_table,' (',insert_columns,') values (', insert_values,')');
execute immediate v_stmt_text;
EXCEPTION
WHEN OTHERS THEN
rollback;
v_error_msg = exception_message();
v_error_msg = v_error_msg || '- EXCEPTION: error building insert columns string: ';
echo select v_error_msg as "error";
INSERT INTO errors VALUES(v_error_msg,current_timestamp());
raise;
END;
END IF;
END LOOP;
END$$
DELIMITER ;