Unable to delete data before inserting does not work while altering pipeline to earliest

Dears,

We will be loading data to a Singlestore Columnstore table from Kafka through a procedure and pipeline. We have encountered an issue as below:

A customer uploads 3 attachments and the details of the attachment is stored in table and in the next case they upload 2 attachments .
In the end table there should only be 2 attachments.
For this case, we have written a syntax which deletes the 3 attachments and then inserts 2 attachments into the table.
This works fine in normal scenarios but when our pipeline fails and pipeline is altered from the earliest, this logic behaves abnormally.
Could you please help us a way out.

Here is the sample table, pipeline and procedure.

CREATE PIPELINE delete_testing
AS LOAD DATA KAFKA ‘10.XX.XX.XX:XXXX/deals_test’
BATCH_INTERVAL 2500
INTO PROCEDURE delete_test
FORMAT AVRO
SCHEMA REGISTRY ‘http://10.XX.XX.XX:XXXX’
(
deals_event ← %
);

CREATE OR REPLACE PROCEDURE delete_test(deals_event_batch query(deals_event JSON COLLATE utf8_bin NULL)) RETURNS void AS

DECLARE p_id_batch query(p_party_id text,p_data_content_type text);

BEGIN

p_id_batch = select DISTINCT json_extract_string(deals_event,“event”,“id”),
json_extract_string(deals_event,“datacontenttype”) from deals_event_batch as ppib;

delete from account_manager as amg
where amg.party_id in(
select DISTINCT ppie.p_party_id from p_id_batch as ppie);

INSERT INTO account_manager(party_id,id,name,email_id,role)
SELECT
json_extract_string(json_extract_string(table_col,“partyId”),“string”),
json_extract_string(json_extract_string(table_col,“id”),“string”),
json_extract_string(json_extract_string(table_col,“name”),“string”),
json_extract_string(json_extract_string(table_col,“email”),“string”),
json_extract_string(json_extract_string(table_col,“role”),“string”)
from deals_event
left join table
(json_to_array(json_extract_string(deals_event,“event”,“organization”,“amanager”))) on true
on duplicate key update
name= values(name),
email_id= values(email_id),
role= values(role);
END;

CREATE TABLE account_manager (
party_id varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
id varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
name varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
email_id varchar(60) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
role varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
operation_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6),
operation_upd_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
UNIQUE KEY ids (party_id) USING HASH,
SHARD KEY __SHARDKEY (party_id),
KEY party_id (party_id)
) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE=‘STRICT_ALL_TABLES’

Hi Sowmya,

can you share in detail what you mean by alter pipeline from the earliest and what is meant by abnormal behavior

Hi @srachamadugu,

PFB responses.

Kafka stores data in a sequential format(offsets), we load data into Single store table using a pipeline.
The phrase earliest means loading the data from the start of the offsets.
The query we use for this function - alter pipeline delete_testing set offsets earliest.

Using the code mentioned, the data deletion is not happening as expected (explained in the problem statement) while altering from earliest which was meant as abnormal behavior.

Its quite difficult to comment what could go wrong, without knowing data shape and the order of events

in case of replays the kafka offsets are read from beginning and some offsets could be dropped by kafka in the beginning and because of that the items in a batch may differ from initial runs and this could be causing issues if the data of a customer is split across batches