Kafka Pipeline Optimization for real-time ingestion

Hello.

Now we’re testing the Kafka pipeline.
The goal is to ingest large amounts of data in real-time in less than a second.

When the amount of data increases, let us know how to keep latency steady within a second.
I’d like to know the options to reduce latency between the time that data is generated and the time that MemSQL consumes.

We are considering the following options.

  1. Increasing the number of Leaf Nodes.
  2. Set the number of partitions according to the number of physical cores.
  3. Sufficient number of Kafka cluster servers.
  4. Kafka Producer Option.

    “retries,” 0
    “acks”, “0”
    “batch.size”, 16384 //16MB
    “linger.ms”, 1 //ms
    “max.request.size”, 33554432 // 32MB
    “buffer.memory”, 33554432 // 32MB

  5. Kafka broker option.

    num.network.threads, default
    num.io.threads, default

  6. MemSQL Engine Variables.

    pipelines_max_offsets_per_batch_partition, default
    pipelines_extractor_idle_timeout_ms, default

Please let me know the appropriate settings for the above options and additional options to reduce Latency.

Our test results are as follows:

  • Environment

    Server: Oracle Cloud VM.DenseIO.2.24 (Core 24, Network 24.6G, SSD)
    MemSQL: Version 7.0.5 Beta, Columnstore

  • Test Result
    image

  • Comparing
    image

Thank you.

Could you run one of the pipelines with too-high latency and provide output of the following queries:

  • profile pipeline p
  • select * from information_schema.pipelines_batches where pipeline_name = "p" order by batch_start_unix_timestamp desc limit 20;
  • select * from information_schema.pipelines_batches where pipeline_name = "p" order by batch_time desc limit 20;

The most likely culprit here is a too-high pipeline batch interval. If you don’t have a BATCH INTERVAL clause in CREATE PIPELINE, you get the default of 2500 ms. It makes a throughput vs latency tradeoff, and if you find that it has an effect, I’d recommend tuning it with the producer rate you expect to encounter. We’ll be doing some work on BATCH INTERVAL to make this less of a pitfall in the future.

If it’s not that, I’d look for a network, disk, or non-MemSQL cpu bottleneck, given that leaf CPU utilization is low. The output of those queries should help us identify where to go.

2 Likes

Hello, sasha.
Thanks for your reply.

We use only one pipeline.
The batch interval of the pipeline was set to 1 ms.

CREATE PIPELINE quickstart_kafka10 AS LOAD DATA KAFKA '$p_kafka_ip/$p_kafka_topic'
BATCH_INTERVAL 1
INTO TABLE sensor_data_table
(timestamp, name, chid, lotid, ppid, recipeid, chstep, stepseq, partid, status, slotno, sensor, value, lcl, ucl)
FIELDS TERMINATED BY ','  
LINES TERMINATED BY '\n';

The results of the query you requested are as follows.

  • profile pipeline quickstart_kafka10;
memsql> profile pipeline quickstart_kafka10;

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PROFILE                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:all est_rows:5,498,511,824 alias:remote_0 actual_rows: 3,596,125 exec_time: 0ms                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| InsertInto fdc.sensor_data_table [r0.timestamp, r0.name, r0.chid, r0.lotid, r0.ppid, r0.recipeid, r0.chstep, r0.stepseq, r0.partid, r0.status, r0.slotno, r0.sensor, r0.value, r0.lcl, r0.ucl, r0.`NOW()`] local:yes est_rows:5,498,511,824 actual_rows: 3,596,125 exec_time: 1,091ms start_time: [00:00:00.147, 00:00:00.158] Segment Count: 48 Segment Sort: 682ms Segment Compress: 137ms Auto Stats: 39ms                                                                                                                                                                                                                                                                           |
| TableScan r0 storage:list stream:yes table_type:sharded  exec_time: 0ms start_time: [00:00:03.335, 00:00:03.471] end_time: [00:00:03.335, 00:00:03.471]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| Repartition [sensor_data_table_src.timestamp, sensor_data_table_src.name, sensor_data_table_src.chid, sensor_data_table_src.lotid, sensor_data_table_src.ppid, sensor_data_table_src.recipeid, sensor_data_table_src.chstep, sensor_data_table_src.stepseq, sensor_data_table_src.partid, sensor_data_table_src.status, sensor_data_table_src.slotno, sensor_data_table_src.sensor, sensor_data_table_src.value, sensor_data_table_src.lcl, sensor_data_table_src.ucl, NOW(6) AS `NOW()`] AS r0 shard_key:[name, sensor] est_rows:5,498,511,824 actual_rows: 3,596,125 exec_time: 463ms start_time: [00:00:00.100, 00:00:00.119] network_traffic: 720,785.375000 KB network_time: 225ms |
| ExternalIngest pipeline:quickstart_kafka10 table_type:sharded_columnstore                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| |---FileParse actual_rows: 3,596,149 exec_time: 75ms start_time: [00:00:00.027, 00:00:00.040] end_time: [00:00:03.011, 00:00:03.333]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| |---Extract streamed_bytes: 666,754.500000 KB exec_time: 221ms start_time: [00:00:00.027, 00:00:00.040]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| +---PopulateRow actual_rows: 3,596,125 exec_time: 562ms start_time: [00:00:00.100, 00:00:00.119] end_time: [00:00:03.011, 00:00:03.333]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| Compile Total Time: 36ms                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
9 rows in set (3.95 sec)
  • select * from information_schema.pipelines_batches where pipeline_name = “quickstart_kafka10” order by batch_start_unix_timestamp desc limit 20;

memsql> select * from information_schema.pipelines_batches where pipeline_name = "quickstart_kafka10" order by batch_start_unix_timestamp desc limit 20;

+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
| DATABASE_NAME | PIPELINE_NAME      | BATCH_ID | BATCH_STATE | BATCH_ROWS_WRITTEN | BATCH_TIME | BATCH_START_UNIX_TIMESTAMP | BATCH_PARTITION_STATE | BATCH_PARTITION_PARSED_ROWS | BATCH_SOURCE_PARTITION_ID | BATCH_EARLIEST_OFFSET | BATCH_LATEST_OFFSET | BATCH_PARTITION_TIME | BATCH_PARTITION_EXTRACTED_BYTES | BATCH_PARTITION_TRANSFORMED_BYTES | BATCH_PARTITION_EXTRACTOR_WAIT_TIME | BATCH_PARTITION_TRANSFORM_WAIT_TIME | HOST        | PORT | PARTITION |
+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73034 | 15                        |              13372903 |            13445937 |             0.707161 |                        13356992 |                              NULL |                              0.0768 |                                   0 | 10.0.50.112 | 3306 |        23 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73013 | 6                         |              13372880 |            13445893 |              0.70299 |                        13354528 |                              NULL |                              0.0794 |                                   0 | 10.0.50.112 | 3306 |         5 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       72991 | 3                         |              13372879 |            13445870 |             0.725145 |                        13345525 |                              NULL |                              0.0758 |                                   0 | 10.0.50.112 | 3306 |        19 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73034 | 20                        |              13372903 |            13445937 |             0.694565 |                        13356910 |                              NULL |                              0.0818 |                                   0 | 10.0.50.112 | 3306 |        16 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73018 | 14                        |              13372902 |            13445920 |             0.721043 |                        13356089 |                              NULL |                              0.0746 |                                   0 | 10.0.50.112 | 3306 |        10 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73018 | 13                        |              13372902 |            13445920 |             0.692218 |                        13352664 |                              NULL |                               0.078 |                                   0 | 10.0.50.112 | 3306 |         1 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73067 | 21                        |              13372903 |            13445970 |             0.711627 |                        13368672 |                              NULL |                              0.0764 |                                   0 | 10.0.50.112 | 3306 |         7 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73036 | 12                        |              13372902 |            13445938 |             0.695336 |                        13362336 |                              NULL |                              0.0786 |                                   0 | 10.0.50.112 | 3306 |         4 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       72960 | 1                         |              13372866 |            13445826 |             0.693338 |                        13348578 |                              NULL |                               0.066 |                                   0 | 10.0.50.112 | 3306 |        12 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73034 | 16                        |              13372903 |            13445937 |             0.666259 |                        13360595 |                              NULL |                              0.0748 |                                   0 | 10.0.50.112 | 3306 |         9 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73051 | 23                        |              13372904 |            13445955 |             0.697435 |                        13368579 |                              NULL |                              0.0748 |                                   0 | 10.0.50.112 | 3306 |        11 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73010 | 10                        |              13372885 |            13445895 |             0.656598 |                        13355171 |                              NULL |                              0.0676 |                                   0 | 10.0.50.112 | 3306 |         0 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       72997 | 5                         |              13372880 |            13445877 |              0.63179 |                        13349363 |                              NULL |                              0.0638 |                                   0 | 10.0.50.112 | 3306 |        18 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73034 | 19                        |              13372904 |            13445938 |             0.747629 |                        13358418 |                              NULL |                              0.0752 |                                   0 | 10.0.50.112 | 3306 |        22 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73051 | 22                        |              13372904 |            13445955 |             0.701386 |                        13363683 |                              NULL |                              0.0766 |                                   0 | 10.0.50.112 | 3306 |        20 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73013 | 7                         |              13372880 |            13445893 |             0.698237 |                        13351827 |                              NULL |                               0.075 |                                   0 | 10.0.50.112 | 3306 |        21 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73016 | 17                        |              13372903 |            13445919 |             0.736507 |                        13356333 |                              NULL |                              0.0814 |                                   0 | 10.0.50.112 | 3306 |         6 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       72976 | 4                         |              13372879 |            13445855 |              0.68987 |                        13346491 |                              NULL |                              0.0794 |                                   0 | 10.0.50.112 | 3306 |         3 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       73051 | 18                        |              13372904 |            13445955 |             0.691727 |                        13356595 |                              NULL |                              0.0826 |                                   0 | 10.0.50.112 | 3306 |        15 |
| fdc           | quickstart_kafka10 |    58484 | Succeeded   |            1752355 |   1.761614 |          1576635632.322004 | Succeeded             |                       72961 | 0                         |              13372865 |            13445826 |             0.618578 |                        13346150 |                              NULL |                              0.0732 |                                   0 | 10.0.50.112 | 3306 |        13 |
+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
20 rows in set (0.01 sec)

  • select * from information_schema.pipelines_batches where pipeline_name = “quickstart_kafka10” order by batch_time desc limit 20;

+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
| DATABASE_NAME | PIPELINE_NAME      | BATCH_ID | BATCH_STATE | BATCH_ROWS_WRITTEN | BATCH_TIME | BATCH_START_UNIX_TIMESTAMP | BATCH_PARTITION_STATE | BATCH_PARTITION_PARSED_ROWS | BATCH_SOURCE_PARTITION_ID | BATCH_EARLIEST_OFFSET | BATCH_LATEST_OFFSET | BATCH_PARTITION_TIME | BATCH_PARTITION_EXTRACTED_BYTES | BATCH_PARTITION_TRANSFORMED_BYTES | BATCH_PARTITION_EXTRACTOR_WAIT_TIME | BATCH_PARTITION_TRANSFORM_WAIT_TIME | HOST        | PORT | PARTITION |
+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106143 | 23                        |               7437199 |             7543342 |              1.04369 |                        19542585 |                              NULL |                              0.0804 |                                   0 | 10.0.50.112 | 3306 |        23 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106166 | 4                         |               7437100 |             7543266 |             1.459022 |                        19548721 |                              NULL |                              0.0784 |                                   0 | 10.0.50.112 | 3306 |         4 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106172 | 13                        |               7437146 |             7543318 |              1.61076 |                        19547217 |                              NULL |                               0.075 |                                   0 | 10.0.50.112 | 3306 |         0 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106166 | 3                         |               7437100 |             7543266 |             1.043879 |                        19547489 |                              NULL |                              0.0778 |                                   0 | 10.0.50.112 | 3306 |        11 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106172 | 7                         |               7437100 |             7543272 |             1.443723 |                        19545911 |                              NULL |                              0.0754 |                                   0 | 10.0.50.112 | 3306 |        19 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106172 | 19                        |               7437146 |             7543318 |             1.010057 |                        19547055 |                              NULL |                              0.0816 |                                   0 | 10.0.50.112 | 3306 |        16 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106154 | 9                         |               7437146 |             7543300 |             0.978359 |                        19543815 |                              NULL |                               0.077 |                                   0 | 10.0.50.112 | 3306 |        10 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106166 | 5                         |               7437100 |             7543266 |             1.448879 |                        19542030 |                              NULL |                              0.0764 |                                   0 | 10.0.50.112 | 3306 |         1 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106151 | 8                         |               7437116 |             7543267 |             1.459546 |                        19546466 |                              NULL |                              0.0654 |                                   0 | 10.0.50.112 | 3306 |         7 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106154 | 14                        |               7437146 |             7543300 |             1.656411 |                        19542129 |                              NULL |                              0.0752 |                                   0 | 10.0.50.112 | 3306 |        12 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106179 | 18                        |               7437163 |             7543342 |             1.442558 |                        19544220 |                              NULL |                              0.1006 |                                   0 | 10.0.50.112 | 3306 |         9 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106166 | 2                         |               7437102 |             7543268 |             1.469122 |                        19540252 |                              NULL |                              0.0684 |                                   0 | 10.0.50.112 | 3306 |         5 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106166 | 1                         |               7437084 |             7543250 |             1.447215 |                        19548063 |                              NULL |                              0.0768 |                                   0 | 10.0.50.112 | 3306 |        18 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106172 | 20                        |               7437146 |             7543318 |             1.443822 |                        19543606 |                              NULL |                              0.0944 |                                   0 | 10.0.50.112 | 3306 |        22 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106173 | 16                        |               7437145 |             7543318 |             1.028771 |                        19543899 |                              NULL |                              0.0766 |                                   0 | 10.0.50.112 | 3306 |        20 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106156 | 22                        |               7437163 |             7543319 |             1.003458 |                        19542949 |                              NULL |                              0.0812 |                                   0 | 10.0.50.112 | 3306 |        21 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106170 | 15                        |               7437146 |             7543316 |             1.456105 |                        19544174 |                              NULL |                              0.0774 |                                   0 | 10.0.50.112 | 3306 |         6 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106150 | 6                         |               7437116 |             7543266 |             1.447423 |                        19537783 |                              NULL |                              0.0778 |                                   0 | 10.0.50.112 | 3306 |         3 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106171 | 12                        |               7437146 |             7543317 |             1.607721 |                        19543512 |                              NULL |                               0.083 |                                   0 | 10.0.50.112 | 3306 |        15 |
| fdc           | quickstart_kafka10 |    58426 | Succeeded   |            2547913 |   4.145809 |          1576635450.419766 | Succeeded             |                      106177 | 21                        |               7437163 |             7543340 |             1.018147 |                        19547605 |                              NULL |                              0.0658 |                                   0 | 10.0.50.112 | 3306 |        13 |
+---------------+--------------------+----------+-------------+--------------------+------------+----------------------------+-----------------------+-----------------------------+---------------------------+-----------------------+---------------------+----------------------+---------------------------------+-----------------------------------+-------------------------------------+-------------------------------------+-------------+------+-----------+
20 rows in set (0.12 sec)

Thanks.

1 Like

Hello, sasha.

There are additional questions about the number of pipelines.
Could you tell me how the number of pipelines is related to performance?

Thank you.