To replace Elasticsearch, I am conducting a test to collect Json data using Kafka PIPELINE function in MemSQL. When trying to load kafka’s topic data into MemSQL, Pipeline stopped when an error occurred “Invalid JSON value for column ‘message’”.
Looking at the Info Schema table, it appears that an error has occurred because the data type in the form of " could not be loaded.
When loading data through Procedure, it seems to be a problem that does not handle special characters (characters starting with ). What’s the solution to this?(It should not be using \)
-
Test Environment
- Data flow : metricbeat → kafka→ MemSQL PIPELINE → MemSQL Tables
- MemSQL Version : 7.1.7
-
PIPELINE Configure
2.1 Procedure
DELIMITER //
CREATE OR REPLACE PROCEDURE proc_mbeats (kafka_metricbeat query(message json))
AS
BEGIN
.
.
END //
DELIMITER ;
2.2 Pipeline
CREATE PIPELINE pl_mbeats AS LOAD DATA KAFKA 'localhost/metricbeat' INTO procedure proc_mbeats;
3.3 Error meassage & Info Schema
* Error_Message : Invalid JSON value for column ‘message’
* Error_Kind : Load / Load_Data_Line :
{"metricset":{"name":"process","period":10000},"ecs":{"version":"1.5.0"},"event":{"duration":315087751,"dataset":"system.process","module":"system"},"@version":"1","system":{"process":{"cmdline":"/usr/bin/ssh-agent /bin/sh -c exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\"","state":"sleeping","cgroup":{"cpuacct":{"total":{"ns":1322048427228212},"percpu":{"2":344223580301541,"3":339673452025808,"4":335369976030739,"1":302781418870124},"stats":{"system":{"ns":550364150000000},"user":{"ns":678525700000000}},"path":"/user.slice","id":"user.slice"},"id":"user.slice","blkio":{"total":{"ios":29544670,"bytes":573001093120},"path":"/user.slice","id":"user.slice"},"cpu":{"cfs":{"period":{"us":100000},"quota":{"us":0},"shares":1024},"rt":{"period":{"us":1000000},"runtime":{"us":0}},"stats":{"throttled":{"ns":0,"periods":0},"periods":0},"path":"/user.slice","id":"user.slice"},"path":"/user.slice","memory":{"kmem":{"usage":{"max":{"bytes":0},"bytes":0},"limit":{"bytes":9223372036854771712},"failures":0},"id":"user.slice","mem":{"usage":{"max":{"bytes":6548250624},"bytes":5639651328},"limit":{"bytes":9223372036854771712},"failures":0},"memsw":{"usage":{"max":{"bytes":7810928640},"bytes":7496921088},"limit":{"bytes":9223372036854771712},"failures":0},"stats":{"page_faults":5320872892,"unevictable":{"bytes":0},"active_file":{"bytes":730714112},"swap":{"bytes":1857269760},"inactive_anon":{"bytes":924241920},"cache":{"bytes":3842818048},"pages_out":1041190970,"rss":{"bytes":1796628480},"rss_huge":{"bytes":10485760},"pages_in":1042542749,"hierarchical_memory_limit":{"bytes":9223372036854771712},"hierarchical_memsw_limit":{"bytes":9223372036854771712},"active_anon":{"bytes":3241947136},"inactive_file":{"bytes":742543360},"major_page_faults":102542,"mapped_file":{"bytes":2297708544}},"kmem_tcp":{"failures":0,"limit":{"bytes":9223372036854771712},"usage":{"max":{"bytes":0},"bytes":0}},"path":"/user.slice"}},"memory":{"rss":{"pct":0,"bytes":49152},"size":74211328,"share":0},"cpu":{"total":{"value":48980,"pct":0.001,"norm":{"pct":2.0E-4}},"start_time":"2020-07-21T02:25:14.000Z"}}},"agent":{"version":"7.9.0","hostname":"apigw.itmsg.com","id":"2e3eb88f-2eaa-45db-a34a-1cc85c3cbeca","name":"apigw.itmsg.com","type":"metricbeat","ephemeral_id":"2e09d098-68a3-4c4c-8998-c2121f32f904"},"service":{"type":"system"},"@timestamp":"2020-10-10T04:29:28.946Z","user":{"name":"itmsg"},"tags":["beats_input_raw_event"],"cloud":{"iscloud":"no"},"host":{"hostname":"apigw.itmsg.com","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","codename":"Core","kernel":"3.10.0-1127.el7.x86_64","platform":"centos"},"name":"apigw.itmsg.com","architecture":"x86_64","id":"812d3a73deef42039707c92b4f3de3a9","containerized":false,"ip":["192.168.0.191","fe80::a25d:431:d1a9:c168","192.168.122.1","172.17.0.1"],"mac":["00:50:56:90:21:7b","52:54:00:4c:1a:18","52:54:00:4c:1a:18","02:42:00:9d:c6:56"]},"process":{"name":"ssh-agent","ppid":22549,"pid":22694,"args":["/usr/bin/ssh-agent","/bin/sh","-c","exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\""],"pgid":22694}}
In addition, I added the column name as below, but the situation is the same.
STOP PIPELINE pl_mbeats;
DROP PIPELINE pl_mbeats;
CREATE PIPELINE pl_mbeats AS LOAD DATA KAFKA 'localhost/metricbeat' INTO procedure proc_mbeats (message);
TEST PIPELINE pl_mbeats LIMIT 1;
START PIPELINE pl_mbeats