Hi,
I’m trying to store messages which were posted from a Kafka topic through Pipeline and Stored Procedure.
a) Pipeline was subscribed to Kafka topic.
b) Memsql Pipeline will trigger the stored procedure.
Everything is going good for positive scenario’s. In case of any corrupted message received from Kafka topic then rest of the valid messages in the same batch were getting Ignore.
I tried to execute batch in an loop only when I receive a corrupt message but that also didn’t work out. It’s still ignoring remaining valid messages.
For example:
if I have received 3 messages in the batch, out of which 1 is invalid then I’m expecting two valid records needs to be inserted into TABLE_1 and remaining one should be in Errors table.
But we are getting one invalid message to Error table but remaining two messages were not getting inserted in any table.
Queries:
delimiter //
CREATE OR REPLACE PROCEDURE log_errors(sample_table_1 query(a JSON, b JSON))
AS
DECLARE
arr ARRAY(RECORD(a JSON, b JSON)) = COLLECT(sample_table_1);
_col_1 VARCHAR(50);
_col_2 VARCHAR(50);
_col_3 VARCHAR(50);
BEGIN
FOR r IN arr LOOP
_col_1 = r.a::$col_1;
_col_2 = r.a::$col_2;
_col_3 = r.a::$col_3;
INSERT INTO TABLE_1 (COL_1,COL_2,COL_3) VALUES(_col_1,_col_2,_col_3);
END LOOP;
EXCEPTION
WHEN OTHERS THEN
DECLARE
tmp text = exception_message();
BEGIN
INSERT INTO ERRORS VALUES(_col_1,tmp,current_timestamp());
END;
END//
CREATE OR REPLACE PROCEDURE parse_procedure_1 (
sample_table_1 query(
a JSON,
b JSON))
AS
DECLARE
BEGIN
INSERT INTO TABLE_1 (COL_1,COL_2,COL_3)
SELECT DISTINCT b::$col_1,b::$col_2,b::$col_3
FROM sample_table_1 tble
EXCEPTION
WHEN OTHERS THEN
CALL log_errors(sample_table_1);
END //
delimiter ;
CREATE PIPELINE my_pipeline AS
LOAD DATA KAFKA ‘*:9092/’
INTO PROCEDURE parse_procedure_1
FORMAT JSON (
a ← a default NULL,
b ← b default NULL
);
Note: Please Ignore the syntax.
Since Multi Transactions were not allowed in Pipeline - Stored Procedure combination. Please let me know the alternative for this.
Thanks
Arun