Is it possible to flag a column as having all values within the same shard?

Hey

For context:

  • We track users, they each have an id: user_id.
  • For each user we track when they use our app, each instance of app usage is referred to as a session. A user can have many sessions. Each session have an id: session_id.

Currently, our primary use cases are focusing on analyzing sessions, therefore we have sharded the majority of our tables on session_id. Performance is great and all is good.

Now we’ve been tasked with also focusing on analyzing users. This means that our sharding on session_id is no longer optimal, as we cant be sure that all sessions related to a user is on the same partition. We could change the sharding such that we shard on user_id, but then I doubt the query planner would know, that all session_ids within the partition would actually be unique to that partition, which would cause all of our current queries to be less optimal. Using (user_id, session_id) as the shard key, probably wouldn’t be of much help either, as the user_id would still be spread across different partitions, right?

I’ll try and give a very simplified example of the issue…

Using session_id as shard key, I assume data would be distributed sort of like this:

+-------------------------------------------+
| user_id | session_id | log_id | partition |
+-------------------------------------------+
| 1       | 1          | 11     | 1         |
| 1       | 1          | 12     | 1         |
| 1       | 1          | 13     | 1         |
| 1       | 2          | 32     | 2         |
| 2       | 3          | 65     | 3         |
| 3       | 4          | 68     | 4         |
+-------------------------------------------+

With this I can easily do count(distinct session_id) locally, but doing count(distinct user_id) needs to have data distributed between nodes.

Using user_id as shard key, I assume data would be distributed sort of like this:

+-------------------------------------------+
| user_id | session_id | log_id | partition |
+-------------------------------------------+
| 1       | 1          | 11     | 1         |
| 1       | 1          | 12     | 1         |
| 1       | 1          | 13     | 1         |
| 1       | 2          | 32     | 1         |
| 2       | 3          | 65     | 2         |
| 3       | 4          | 68     | 3         |
+-------------------------------------------+

With this I can easily do count(distinct user_id) locally, but doing count(distinct session_id) needs to have data distributed between nodes.

Using (user_id, session_id) as shard key, I assume data would be distributed sort of like this:

+-------------------------------------------+
| user_id | session_id | log_id | partition |
+-------------------------------------------+
| 1       | 1          | 11     | 2         |
| 1       | 1          | 12     | 2         |
| 1       | 1          | 13     | 2         |
| 1       | 2          | 32     | 3         |
| 2       | 3          | 65     | 1         |
| 3       | 4          | 68     | 3         |
+-------------------------------------------+

With this doing either count(distinct session_id) or count(distinct user_id) needs to have data distributed between nodes, because the query planner cant know for sure that a session_id is always tied to the same user_id.

So I guess my question is, how do i define my shard key, such that both count(distinct session_id) and count(distinct user_id) can be done locally on the partition? Is it possible to state some relation between two columns like:

create table log (
  id bigint not null,
  user_id bigint not null,
  session_id bigint not null unique by (user_id),
  timestamp datetime not null series timestamp,
  shard key (user_id),
  sort key (timestamp),
  primary key (id, session_id, user_id)
);

Also just want to note that creating a projection table with user_id as the shard key instead of session_id, is currently not an option as that would double the size of data and therefore also storage cost (right?).

Hi, check out Projections: Projections: A Powerful New Way to Speed Up Queries in SingleStore CREATE PROJECTION · SingleStore Documentation

@tom thanks for your reply, but I specifically listed projections as not an option.

ah sorry, didn’t see it :sweat_smile: sadly projections are the only solution to this problem

Do you know if there is an upcoming feature that would support my request?
And do you know if I’m correct in my assumption, that using projections on a table will double its storage cost?

And thanks for clarifying that currently projections are the only way.

Using a projection that includes all the columns of the original table can roughly double the storage space needed. However, you may not need to include all columns of the table in the projection’s SELECT list if the projection is only for speeding up certain queries that don’t need the other columns. In that case, you could save a significant amount of storage but still get the speedups you want from the projection.

Also, storage is cheaper than compute time (VCPUs) typically, so it might make sense to use more storage if you save a lot of CPU time by getting better plans for more of the queries in your workload.

Yes, surely I can look at limiting the amount of columns I need, but it’ll likely be close to the same amount as in the original table. And storage is definitely cheaper, however we currently have around 65 billion entries in just one of the tables where I would then need to do a projection. So though it’s not as expensive as compute time, it’ll be noticeable increase in storage cost. Again, I’ll take your valuable input back to my team and we’ll discuss what to do from here on out.

I do however, believe it would be a nice addition to somehow define relations like the one I’m having, such that the query planner can optimize based on that input. I guess it would be applicable in many cases where there’s a hierarchal structure to the data. Would it somehow be possible to get this added as a feature request?

What you have is is a functional dependency where session_id functionally determines user_id.

F(session_id) ==> user_id

Moreover, neither session_id nor user_id are unique.

And you’d like it if the system used this information so that if you shard on user_id that the system can do local group bys on session_id as well as user_id.

I will open a feature request for this but honestly I’d hesitate to invest in this feature because it takes a lot of sophistication on the part of the user to utilize it and it would not get used much.

Would you use this feature if we had something like an unenforced functional dependency with a RELY option (like we have for unenforced unique keys with RELY option?). Then you could get wrong query answers if the functional dependency was not true in your data.

For our own reference, the feature requests opened for this is PM-3093.

1 Like

Yes, I most certainly would use it even if unenforced. I purely want to feed information about the data to the cluster, such that it can use it for query optimisation. I don’t expect it to enforce or guarantee anything. So what you’re describing is pretty much spot on.

Thanks for helping with this!

One of our engineers suggested a possible workaround. Suppose you have

T(A, B, shard key(B))

and A functionally determines B

You could write a query like this:

select A, count(*) 
from t 
group by A, B;

The query will get a local group-by because the shard key is contained within the group by columns. And the result will be the same as if you grouped only by A since A determines B.

With your data:

create table sessionlog(user_id int, session_id int, shard key(user_id));
insert sessionlog values(1,1),(1,1),(1,1),(1,2),(2,3),(3,4);

select session_id, count(*)
from sessionlog
group by session_id, user_id;

+------------+----------+
| session_id | count(*) |
+------------+----------+
|          3 |        1 |
|          1 |        3 |
|          2 |        1 |
|          4 |        1 |
+------------+----------+

explain select session_id, count(*) from sessionlog group by session_id, user_id;
+------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:all est_rows:6 alias:remote_0 parallelism_level:sub_partition                                        |
| Project [sessionlog.session_id, `count(*)`] est_rows:6                                                                 |
| HashGroupBy [COUNT(*) AS `count(*)`] groups:[sessionlog.session_id, sessionlog.user_id]                                |
| ColumnStoreFilter [<after per-thread scan begin> AND <before per-thread scan end>]                                     |
| ColumnStoreScan db1.sessionlog, SORT KEY __UNORDERED () table_type:sharded_columnstore est_table_rows:6 est_filtered:6 |
+------------------------------------------------------------------------------------------------------------------------+

And the above result is the same as this:

select session_id, count(*) from sessionlog group by session_id;
+------------+----------+
| session_id | count(*) |
+------------+----------+
|          1 |        3 |
|          3 |        1 |
|          4 |        1 |
|          2 |        1 |
+------------+----------+

But this query can’t do strictly local group by – notice the additional group-by operator in the plan.

 explain select session_id, count(*) from sessionlog group by session_id;
+------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------+
| Project [remote_0.session_id, CAST(COALESCE($0,0) AS SIGNED) AS `count(*)`] est_rows:4                                 |
| HashGroupBy [SUM(remote_0.`count(*)`) AS $0] groups:[remote_0.session_id]                                              |
| Gather partitions:all est_rows:4 alias:remote_0 parallelism_level:segment                                              |
| Project [sessionlog.session_id, `count(*)`] est_rows:4                                                                 |
| HashGroupBy [COUNT(*) AS `count(*)`] groups:[sessionlog.session_id]                                                    |
| ColumnStoreScan db1.sessionlog, SORT KEY __UNORDERED () table_type:sharded_columnstore est_table_rows:6 est_filtered:6 |
+------------------------------------------------------------------------------------------------------------------------+
1 Like

Thanks hanson, I’ll keep this in mind. Do you have an idea on how much the additional group by would “cost” when considering we’re going to run the queries on tables with above 4 billion entries?

And I guess if we were to do count(distinct session_id) then we’d also have to use the group by user_id and then wrap that as an inner query. Like if we had the unenforced functionality as we talked about, I could probably just do:

select count(distinct session_id)
  from sessionlog
 where timestamp between X and Y

With your suggestion I’d need to do:

select sum(x.sessions) sessions
  from (  select user_id, count(distinct session_id) sessions
            from sessionlog
           where timestamp between X and Y
        group by user_id
       ) x

To make use of the sharding being on user_id, right?

I’m not sure what you mean by “additional group” because there’s only one group by clause in my example. If you are asking about the cost of the extra group by column, there is some but it is probably a lot less than cost of a shuffle. To be sure, you’d have to run it and check.

Since session_id ==> user_id I think you could just write

select count(distinct session_id, user_id)
  from sessionlog
 where timestamp between X and Y

and get the same result.

The query optimization for count distinct operations is done similar to GROUP BY, so the query I wrote probably won’t have a shuffle because the list of distict columns contains the shard key.

1 Like