Dears,
We will be loading data to a Singlestore Columnstore table from Kafka through a procedure and pipeline. We have encountered an issue as below:
A customer uploads 3 attachments and the details of the attachment is stored in table and in the next case they upload 2 attachments .
In the end table there should only be 2 attachments.
For this case, we have written a syntax which deletes the 3 attachments and then inserts 2 attachments into the table.
This works fine in normal scenarios but when our pipeline fails and pipeline is altered from the earliest, this logic behaves abnormally.
Could you please help us a way out.
Here is the sample table, pipeline and procedure.
CREATE PIPELINE delete_testing
AS LOAD DATA KAFKA ‘10.XX.XX.XX:XXXX/deals_test’
BATCH_INTERVAL 2500
INTO PROCEDURE delete_test
FORMAT AVRO
SCHEMA REGISTRY ‘http://10.XX.XX.XX:XXXX’
(
deals_event
← %
);
CREATE OR REPLACE PROCEDURE delete_test
(deals_event_batch query(deals_event
JSON COLLATE utf8_bin NULL)) RETURNS void AS
DECLARE p_id_batch query(p_party_id
text,p_data_content_type
text);
BEGIN
p_id_batch = select DISTINCT json_extract_string(deals_event,“event”,“id”),
json_extract_string(deals_event,“datacontenttype”) from deals_event_batch as ppib;
delete from account_manager as amg
where amg.party_id in(
select DISTINCT ppie.p_party_id from p_id_batch as ppie);
INSERT INTO account_manager(party_id,id,name,email_id,role
)
SELECT
json_extract_string(json_extract_string(table_col,“partyId”),“string”),
json_extract_string(json_extract_string(table_col,“id”),“string”),
json_extract_string(json_extract_string(table_col,“name”),“string”),
json_extract_string(json_extract_string(table_col,“email”),“string”),
json_extract_string(json_extract_string(table_col,“role”),“string”)
from deals_event
left join table
(json_to_array(json_extract_string(deals_event,“event”,“organization”,“amanager”))) on true
on duplicate key update
name= values(name),
email_id= values(email_id),
role= values(role);
END;
CREATE TABLE account_manager
(
party_id
varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
id
varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
name
varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
email_id
varchar(60) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
role
varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
operation_time
timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6),
operation_upd_time
timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
UNIQUE KEY ids
(party_id
) USING HASH,
SHARD KEY __SHARDKEY
(party_id
),
KEY party_id
(party_id
)
) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE=‘STRICT_ALL_TABLES’