Kafka Pipelines and Query Tuning
Notebook
Ingesting real time data from the International Space Station (ISS)
1. Drop the database if it exists, create a new database, switch to it, and then create a table.
Database Name
In the following cell you will enter your email address as the database name. However, you will need to replace all characters that are not underscores or alpha numberics with underscores.
Example:
If your email address is lorrin.smith-bates@singlestore.com you would use lorrin_smith_bates_singlestore_com
In [1]:
email_address = "<< enter your email address >>"
Remove characters that can't be used in a database name.
In [2]:
import remodified_email_address = re.sub(r'[^A-Za-z0-9]', '_', email_address)modified_email_address
In [3]:
%%sqlDROP DATABASE IF EXISTS {{ modified_email_address }};CREATE DATABASE {{ modified_email_address }};USE {{ modified_email_address }};CREATE TABLE iss_location(name varchar(10),id int,latitude float,longitude float,velocity float,visibility varchar(20),footprint float,timestamp bigint,daynum float,solar_lat float,solar_lon float,units varchar(20),url varchar(255));
2. Create a SingleStore pipeline to ingest ISS data from a Kafka topic.
In [4]:
%%sqlCREATE OR REPLACE PIPELINE iss_pipeline ASLOAD DATA kafka '100.25.125.23/iss'INTO TABLE iss_locationFORMAT JSON;
3. Test the pipeline.
In [5]:
%%sqlTEST PIPELINE iss_pipeline;
4. Start the Pipeline
In [6]:
%%sqlSTART PIPELINE iss_pipeline;
5. Get the count of records. Run this a few times to see the number of records ingested.
In [7]:
%%sqlSELECT COUNT(*) FROM iss_location;
6. Get the latest location record. Click the link to see the location of the ISS in Google Maps.
In [8]:
%%sqlSELECT timestamp, urlFROM iss_locationORDER BY timestamp descLIMIT 1;
7. Stop the pipeline and delete the data from the iss_location table.
In [9]:
%%sqlSTOP PIPELINE iss_pipeline;DELETE FROM iss_location;
8. Change the pipeline offsets and interval.
In [10]:
%%sqlALTER PIPELINE iss_pipelineSET BATCH_INTERVAL 30000SET OFFSETS latest ;
9. Start the Pipeline again.
In [11]:
%%sqlSTART PIPELINE iss_pipeline;
10. Count the records, notice how the records are populated now after alterning the pipeline.
In [12]:
%%sqlSELECT COUNT(*) from iss_location;
11. Stop the pipeline
In [13]:
%%sqlSTOP PIPELINE iss_pipeline;
Query Optimization
1. Restore the 'employees' database that has been backed up into a public S3 bucket
For the database name we'll prepend employees_ to the modified email address again.
In [14]:
%%sqlRESTORE DATABASE employees AS employees_{{ modified_email_address }}FROM S3 'train.memsql.com/employee'CONFIG'{"region":"us-east-1"}'CREDENTIALS'{}';
2. Switch to the Employees database
In [15]:
%%sqlUSE employees_{{ modified_email_address }};
3. Run a query that joins 4 tables and orders by 4 columns in 3 tables
In [16]:
%%sqlSELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments d ON de.dept_no=d.dept_noINNER JOIN titles t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;
4. Examine the query execution profile using EXPLAIN
In [17]:
%%sqlEXPLAIN SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments d ON de.dept_no=d.dept_noINNER JOIN titles t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;
5. Profile the query by using PROFILE.
In [18]:
%%sqlPROFILE SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments d ON de.dept_no=d.dept_noINNER JOIN titles t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;
6. Run SHOW PROFILE to view the statistics on an actual run of the query
In [19]:
%%sqlSHOW PROFILE;
7. Run Visual Profile to see this the profile in a GUI format
Query/Schema Tuning Exercise
Now that we've visualized our query execution plan, let's address some of the issues we've uncovered.
1. Create a Reference table for departments
In [20]:
%%sqlCREATE REFERENCE TABLE departments_ref(dept_no CHAR(4) not null,dept_name varchar(40) not null,primary key (dept_no),key(dept_name));INSERT INTO departments_ref (SELECT * FROM departments);
2. Profile the old and the new
In [21]:
%%sql-- CONTROL. Here is the original query. We can use this as our control in our experiment.SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments d ON de.dept_no=d.dept_noINNER JOIN titles t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;-- IMPROVED. Here is the slightly more improved query with the departments_ref tableSELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments_ref d ON de.dept_no=d.dept_noINNER JOIN titles t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;-- PROFILE them both and observe the differences.
3. Create a titles table with sort and shard keys defined.
In [22]:
%%sqlCREATE TABLE titles_sharded (emp_no INT NOT NULL,title VARCHAR(50) NOT NULL,from_date DATE NOT NULL,to_date DATE,SORT KEY (emp_no),SHARD KEY (emp_no));INSERT INTO titles_sharded (SELECT * FROM titles);
4. Add shard and sort keys to the dept_emp table
In [23]:
%%sqlCREATE TABLE dept_emp_sharded(emp_no int not null,dept_no char(4) not null,from_date date not null,to_date date not null,SORT KEY (dept_no),SHARD KEY(emp_no),KEY (dept_no));INSERT INTO dept_emp_sharded (SELECT * FROM dept_emp);
In [24]:
%%sqlSELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments_ref d ON de.dept_no=d.dept_noINNER JOIN titles_sharded t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;
5. Add shard and sort keys to the employees table
In [25]:
%%sqlCREATE TABLE employees_sharded (emp_no INT NOT NULL,birth_date DATE NOT NULL,first_name VARCHAR(14) NOT NULL,last_name VARCHAR(16) NOT NULL,hire_date DATE NOT NULL,SORT KEY (emp_no),SHARD KEY (emp_no));INSERT INTO employees_sharded (SELECT * FROM employees);
In [26]:
%%sqlSELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_dateFROM employees_sharded eINNER JOIN dept_emp de ON e.emp_no=de.emp_noINNER JOIN departments_ref d ON de.dept_no=d.dept_noINNER JOIN titles_sharded t ON e.emp_no=t.emp_noORDER BY e.first_name, e.last_name, d.dept_name, t.from_dateLIMIT 10;
Details
About this Template
Create a SingleStore pipeline to track the International Space Station and adjust queries & schema to optimize performance.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.