I’m trying to create a Kafka pipeline which will consume a Json from a Kafka topic but I’m having a problem when inserting elements from a list into single store db. Below is my Json:
I have 2 tables, Customer and Characteristic.
The information that is being saved in the Customer table is the id and name. In the Characteristic it’s only being saved the object with id=3 (which is the last one from the list)
Hello @Carlao, thanks for trying out JSON pipelines. Since characteristic is a JSON array, you need explicit reference to each positional element. Consider this pipeline for reference:
the JSON data is of this form {"name":"metric1","values":[1612396800, "1.0"]}
CREATE PIPELINE `metrics`
AS LOAD DATA S3 '...'
INTO TABLE `t`
FORMAT JSON
(
`t`.`name` <- `name`,
@val <- `values` DEFAULT null,
`t`.`data` <- %
)
SET
`ts` = json_extract_bigint(@val, 0),
`int_val` = json_extract_bigint(@val, 1)
Of course, if you have a variable number of elements in the JSON array, you would need to invoke the pipeline into a stored procedure, like you are doing in your example, convert JSON array into an array of JSON objects, and iterate over that. There is a really good example for that contained in our docs here: SingleStoreDB Cloud · SingleStore Documentation
DELIMITER //
CREATE OR REPLACE FUNCTION array_as_string(a ARRAY(JSON) NULL)
RETURNS VARCHAR(255) AS
DECLARE result VARCHAR(255);
BEGIN
IF isnull(a) THEN
result = "NULL";
ELSE
result = "Values: [";
FOR i IN 0 .. LENGTH(a) - 1 LOOP
IF i < LENGTH(a) - 1 THEN
result = CONCAT(result, a[i], ", ");
ELSE
result = CONCAT(result, a[i], "");
END IF;
END LOOP;
END IF;
RETURN CONCAT(result, "]");
END //
-- Regular string split with delimiter (does not trim)
CREATE OR REPLACE FUNCTION udf_json_to_array(js JSON NULL)
RETURNS VARCHAR(255) AS
DECLARE
jsonArray array(JSON) NULL;
result VARCHAR(255);
BEGIN
jsonArray = JSON_TO_ARRAY(js);
result = array_as_string(jsonArray);
return result;
END //
DELIMITER ;
By the way this doesn’t work (highlighted):
for i in 0 … length(v_customer_characteristic_array) - 1 loop
insert into CHARACTERISTIC (characteristic_id, characteristic_name, value_type, characteristic_type, customer_id)
values (v_characteristic[i]::$id, v_characteristic[i]::$name, v_characteristic[i]::$valueType, v_characteristic[i]:: $@type, v_customer_id);
In order to work I need to create a new var:
v_characteristic = v_customer_characteristic_array[i];
and then we can replace in values:
values (v_characteristic::$id, v_characteristic::$name, v_characteristic::$valueType, v_characteristic:@type, v_customer_id);