I am sending this data to a kafka topic.
{
“id”: 3,
“name”: “abhi”,
“houseName”: “xyz”,
“visitorId”: [
821494,
821551,
844766
]
}
I want this data to go to two different table.
people and visitor.
uqh1 = CONCAT(id + ‘-’ + name + ‘-’ +houseName’)
people {
id, name, housename, uqh1
}
uqh2 = CONCAT(id + ‘-’ + name + ‘-’ +houseName’ + ‘-’ + visitorId)
visitor{
uqh1, uqh2, visitorId
}
I am doing something like this.
CREATE OR REPLACE PIPELINE `visitor_pipeline`
AS LOAD DATA KAFKA "bootstrap_server/visitor_topic"
INTO PROCEDURE visitor_migration
(visitor <- %)
FORMAT JSON;
DELIMITER //
CREATE OR REPLACE PROCEDURE visitor_migration (batch QUERY(visitor JSON))
AS
DECLARE uqh1 TEXT;
DECLARE uqh2 TEXT;
BEGIN
FOR entry IN collect(batch) LOOP
uqh1 = CONCAT( entry::id, '-', entry::name , '-', entry::housename);
visitors = entry::visitorId;
INSERT IGNORE INTO people (
id, name, housename, unique_hash
) SELECT entry::id, entry::name, entry::housename, uqh1;
FOR i IN 0 .. LENGTH(visitors) - 1 LOOP
vid = visitors[i];
uqh2 = CONCAT(uqh1, '-', vid);
INSERT IGNORE INTO visitors( uqh1, visitor_id, uqh2) SELECT uqh1, vid, uqh2;
END LOOP;
END LOOP;
END //
DELIMITER;
This is the error I am getting.
A non-scalar value entry is used as a scalar.
I can’t even pass array type in the procedure Batch query as it only accepts scaler data type.
I don’t want to send the data one by one instead of array for visitor. That will be an overload on the system. as I may have to send 1000 to 10000 events instead of one.
How can i achieve this?