I am facing issue while ingesting a JSON data via PIPELINE to a table using Store Procedure.
I see NULL values are getting inserted in the table.
Procedure Script:
DELIMITER //
CREATE OR REPLACE PROCEDURE ops.process_users(GENERIC_BATCH query(GENERIC_JSON json)) AS
BEGIN
INSERT INTO ops.USER(USER_ID,USERNAME)
SELECT GENERIC_JSON::USER_ID, GENERIC_JSON::USERNAME
FROM GENERIC_BATCH;
END //
DELIMITER ;
Pipeline Command
CREATE OR REPLACE PIPELINE ops.tweet_pipeline_with_sp AS LOAD DATA KAFKA ‘<KAFKA_SERVER_IP>:9092/user-topic’
INTO PROCEDURE ops.process_users FORMAT JSON ;
Data Pushed to topic: {“USER_ID”:“111”,“USERNAME”:“Test_User”}
FORMAT JSON pipelines take a subvalue_mapping clause which tells us how to map key paths in each input JSON to columns of the destination table or QTV. When it’s omitted, we’ll fall back to looking for keys matching the names of the destination’s columns. So in this case we attempted to find the key “GENERIC_JSON” in the input object, and, because it was missing, assigned NULL.
For this particular example, you can and should (for performance reasons) omit the stored procedure:
create or replace pipeline … into ops.USER(USER_ID ← USER_ID, USERNAME ← USERNAME) format JSON
If you really do need the stored procedure, the fix there is to explicitly tell us to assign the entire JSON to GENERIC_JSON, with the special syntax %.
create or replace pipeline … into ops.process_users(GENERIC_JSON ← %) format JSON
In case you haven’t seen it, the semantics of AS LOAD DATA … FORMAT JSON are defined here:
I have tried with GENERIC_JSON <- % , It is actually working, But with some minor flaws!
Pipeline Script:
CREATE OR REPLACE PIPELINE ops.tweet_pipeline_with_sp AS LOAD DATA KAFKA ‘172.17.0.3:9092/user-topic-1’
INTO PROCEDURE ops.process_users(GENERIC_JSON <- %) FORMAT JSON ;
I believe the issue is that the :: operator returns JSON subobjects as JSON, rather than as some automatically inferred SQL type. To extract a subobject and convert it to a SQL string, use ::$ or json_extract_string(), e.g.:
MemSQL [db]> create table t(a json);
MemSQL [db]> insert into t values('{"a":"hello"}');
MemSQL [db]> select a::a from t;
+---------+
| a::a |
+---------+
| "hello" |
+---------+
1 row in set (0.221 sec)
MemSQL [db]> select json_extract_string(a, 'a') from t;
+-----------------------------+
| json_extract_string(a, 'a') |
+-----------------------------+
| hello |
+-----------------------------+
1 row in set (0.186 sec)
MemSQL [db]> select a::$a from t;
+-------+
| a::$a |
+-------+
| hello |
+-------+
1 row in set (0.207 sec)