Hi I am a beginner and trying to do a POC . I’m trying to load avro data from confluent kafka cloud to single store DB but I am unable to do so .
My avro-schema in confluent [cloud ] is as follows
{
“doc”: “Schema for Employee records”,
“fields”: [
{
“doc”: “Employee ID”,
“name”: “emp_id”,
“type”: “int”
},
{
“doc”: “Employee first name”,
“name”: “emp_fname”,
“type”: “string”
},
{
“doc”: “Employee last name”,
“name”: “emp_lname”,
“type”: “string”
}
],
“name”: “Employee”,
“namespace”: “com.example”,
“type”: “record”
}
I am using singlestore localhost to do the following
I wrote a procedure as follows :
delimiter //
CREATE or REPLACE procedure employeedetail(q query(emp_id int, emp_fname text, emp_lname text)) as begin insert into employee
select * from q;
end //
delimiter ;
I wrote a pipeline as follows :
CREATE or REPLACE pipeline p
AS LOAD DATA KAFKA ‘cloud gcp ip/ topic_name’
CONFIG ‘{“sasl.username”: “username”,
“sasl.mechanism”: “PLAIN”,
“security.protocol”: “SASL_SSL”}’
CREDENTIALS ‘{“sasl.password”: “password”}’
into procedure employeedetail format avro SCHEMA REGISTRY ‘cloud url’
(emp_id ← emp_id, emp_fname ← emp_fname, emp_lname ← emp_lname);
When I run : test pipeline p I get the following error
test pipeline p
ERROR 2341 ER_FOR_UPDATE_SUBQUERY: Leaf Error (localhost:3306): Leaf Error (127.0.0.1:3307): Missing Confluent byte in record header. This input is most likely not schema registry aware Avro. See Load Data · SingleStore Documentation for details on the accepted Avro sub-formats.
Kindly help me with this