I create a table as follow
CREATE TABLE my_table ( MANDT varchar(3) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, RETAILSTOREID varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, BUSINESSDAYDATE varchar(8) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, TRANSINDEX int(11) NOT NULL, ROWKEY int(11) NOT NULL,
UNIQUE KEY PRIMARY (MANDT,RETAILSTOREID,BUSINESSDAYDATE,TRANSINDEX,ROWKEY) USING HASH,
SHARD KEY __SHARDKEY (MANDT,RETAILSTOREID,BUSINESSDAYDATE,TRANSINDEX,ROWKEY),
KEY __UNORDERED () USING CLUSTERED COLUMNSTORE
) ;
On Kafka side my topic contains 2800 messages.
Each message is structured as follow:
schema: Contains record structure definition
payload: Contains the data
I create my pipeline
CREATE OR REPLACE PIPELINE p_tbl AS
LOAD DATA KAFKA ‘localhost:9092/tbl-events’
REPLACE INTO TABLE my_table
(
MANDT<-payload::MANDT,
RETAILSTOREID<-payload::RETAILSTOREID,
BUSINESSDAYDATE<-payload::BUSINESSDAYDATE,
TRANSINDEX<-payload::TRANSINDEX,
ROWKEY<-payload::ROWKEY
)
FORMAT JSON;
The config looks OK. If you got a result rather than an error, that means that we successfully connected to Kafka and downloaded at least one JSON object with the the expected schema (by default, we throw errors if fields mentioned in the mapping clause are missing from the input).
One row would be expected because of that limit 1. By empty you mean that all columns are NULL/0? The simplest explanation would be destination type vs input data incompatibility issues. We have some “assign NULL or 0 after conversion error” semantics for the sake of mysql compatibility, though raising @@data_conversion_compatibility_level will trigger errors instead.
If you create a pipeline from the same topic, but into a table with one BLOB column, with a mapping clause of (the_blob_col <- %), then that same test pipeline query will return the raw JSON data that the real pipeline would be unpacking. That might make any compatibility issues evident.
Thank you for your feedback. Based on your comment.
I have created a table
CREATE TABLE kafka_msg(
msg BLOB
);
Then create the pipeline
CREATE OR REPLACE PIPELINE p_msg AS
LOAD DATA KAFKA ‘hostname:9092/tbl-events’
REPLACE INTO TABLE kafka_msg
(
msg <- %
)
FORMAT JSON;
Then test it test pipeline p_msg limit 1;
I believe that the JSON message was not well formated so I try to load a message extract from Kafka using LOAD DATA. Here I succeed to load the data into my table
I have found my error. I start the pipeline and then stop it after a while. In that case the pipeline offset is the last read.
By using the command ALTER PIPELINE p_tbl SET OFFSETS EARLIEST I’m able to consume data again.