This question is critical to my architecture with SingleStore.
I need to concurrently process rows that have a specific field “should_be_processed == TRUE” with workers. Once the work is done on each row, the should_be_processed field is updated to FALSE.
Given a rowstore table with:
id VARCHAR(30) NOT NULL,
shard_key VARCHAR(30) NOT NULL,
should_be_processed BOOLEAN NOT NULL DEFAULT FALSE,
PRIMARY KEY (id),
KEY (should_be_processed),
SHARD KEY(shard_key)
The following request SELECT shard_key FROM my_table WHERE should_be_processed IS TRUE LIMIT 1 FOR UPDATE;
throws “Error 1706: Leaf Error (127.0.0.1:3307): Feature 'FOR UPDATE clause on a query scanning a secondary key' is not supported by MemSQL”
I tried to use a dedicated ids_to_be_processed rowstore TABLE that only contains IDs to be processed to leverage the PRIMARY KEY, but it leads to all concurrent workers to wait for the exact same row and throws:
leaf Error (127.0.0.1:3307): Lock wait timeout exceeded; try restarting transaction. Row lock owned by connection id 1357, query open idle transaction
Its potentially not ideal, but you can hint the primary key use in the query that does the FOR UPDATE
SELECT shard_key FROM my_table use index() WHERE should_be_processed IS TRUE LIMIT 1 FOR UPDATE
That will scan the primary key.
I’m not sure exactly how your app is structured, but keep in mind that singlestore supports READ COMMITTED isolation level. So if your app scans the table twice in the same transaction, the 2nd scan can pick up newly committed rows (so if the first scan locked the rows with FOR UPDATE, a 2nd scan could see other rows not locked by the first). Stronger isolation levels are coming later this year.
I actually didn’t mention that I tried to add the FORCE KEY(PRIMARY) in my tests, it solved the first issue but I’m still stuck with this error when spawning many concurrent workers:
leaf Error (127.0.0.1:3307): Lock wait timeout exceeded; try restarting transaction. Row lock owned by connection id 1357, query open idle transaction
Looks like the whole table is locked and not just one row. Are you aware of such behavior please?
FOR UPDATE will only lock rows returned by the SELECT. Are you sure the app isn’t generating a real deadlock (locking the same set of rows in different orders?). The locks acquired by FOR UPDATE will be held until the transaction commits.
BEGIN
SELECT … FOR UPDATE
COMMIT <- locks held until commit
You can check the output of information_schema.mv_blocked_queries while the app is running to see if there is such a deadlock.
I think you were on the right track. Your test case is doing SELECT id FROM test LIMIT 1 FOR UPDATE. This query can lock more rows on the leaves then are returned (contrary to my statement above) as the limit will get pushed down to the leaves and the aggregator will then choose 1 row to return (any rows the leaves returned will end up locked).
I’m not sure exactly what you want for your use case, but if you want to lock “any single row” then something like:
SELECT id FROM test LIMIT 1;
BEGIN;
SELECT id FROM test where id = <id from first query> FOR UPDATE;
...
Note that when doing this in many threads they are going to pick from the same small set of rows for the limit 1 query (LIMIT 1 will end up picking the first row in primary key order from any partition - so you only have num partition possible choices for that query).
Thanks for the explanations, I understand now why all the rows are locked and only one got processed.
The SELECT id FROM test LIMIT 1; in your example is out of the transaction, so concurrent connections will select the same id and it won’t solve the problem…
My use case is to get concurrent workers getting and locking a unique row with the guarantee that this row won’t get locked by another similar transaction.
If the behavior of the FOR UPDATE statement forces the leaf to lock the whole table, then can we agree that the implementation of this SQL feature is not correct? and it’s actually impossible to do atomic concurrent processing with SingleStore?
FOR UPDATE doesn’t lock the entire table. In the case of a limit 1 it locks more rows then it returns and those rows are somewhat random at each leaf so you can get end up with deadlocks (it likely ends up locking 1 row per partition - the agg sends the limit 1 to each partition and in the end picks one of them to return).
Also, what is the issue with the example above? If a concurrent delete removes the row you may need to retry again. Even with the limit 1 as you have it in your test case above (directly inside the transaction) you can still get two SELECTs that block each other (They both select the same row and one waits on the other).
something like this in pseudocode:
while true:
SELECT id FROM test LIMIT 1;
BEGIN;
SELECT id FROM test where id = <id from first query> FOR UPDATE;
if <no row returned from SELECT FOR UPDATE>:
ROLLBACK
continue
DELETE ...
COMMIT
The workers are all selecting the same id, and are queued because the 1st worker locked it with FOR UPDATE. After the commit (1sec sleep in the test), all the queued workers get retried.
This pattern leads to a serial processing and wastes concurrency resources (DB overhead with queues…).
Is there a way to safely get a unique row from a table for concurrent processing with SQL?
Your test case is doing SELECT id FROM test LIMIT 1 FOR UPDATE. This query can lock more rows on the leaves then are returned (contrary to my statement above) as the limit will get pushed down to the leaves and the aggregator will then choose 1 row to return (any rows the leaves returned will end up locked).
Would it be possible in a future release of SingleStore to improve this locking behavior on the leaves to just lock the only row that is returned please?
I can’t think of a really good way to lock a single random row from the table. We have ORDER BY RAND() LIMIT 1 FOR UPDATE but that is also going to lock a lot of rows it won’t return. We don’t have good support for LIMIT in conjunction with FOR UPDATE.
Hi Pierre! This is a very interesting problem and something we can absolutely help with. We actually have used similar patterns for some of our internal tools in the past. Here is an example of how you can solve this with no explicit transactions using a secondary table: GitHub - carlsverre/singlestore-atomic at v3
The SELECT FOR UPDATE is certainly the standard way to do this in most relational databases and we will look into this a bit more to determine if it might be usable without any issues.
Regarding the approach I wrote above, please note that for a production solution you will need to handle the case where no rows are returned as well as the case that a worker crashes (thus leaving the row permanently locked). In the past we found a timeout solution worked very well with each worker periodically pinging the row to keep it alive. It’s not perfect but it can scale to large volumes in SingleStore.
Thanks for the idea, here is an adapted version of your code with an infinite loop and a semaphore pattern to saturate the workers until the context timeout: GitHub - pierre-b/singlestore-atomic at v4
I guess the whole magic in your query comes from the RAND() to avoid workers selecting the same rows as much as possible, but it doesn’t seem to scale when the table contains too much rows to shuffle:
Sorry for the delay Pierre. You are correct, I added the RAND() to minimize the duplicate key collisions, however due to the insane performance of SingleStore it actually is faster to accept collisions and instead not do the order by at all. If you remove the ORDER BY and run the benchmark again you should see a much higher process rate.
In the real world workers won’t be dequeuing at exactly the same point in time anyways. You can simulate that by adding a small random jitter to your simulated work by each worker. This will cause the retry-rate to drop more. Another option is to have your works dequeue multiple tasks per cycle. In the real world it might make sense to do this if each task takes a very small amount of time to reduce the round trips to the server and further increase performance. This queue model I shared with you can easily be extended to handle all of these use-cases and more.
The results are more satisfying: with 100k rows at 145 rows / sec (200 concurrent workers).
The processing rate drops and the retry rate doubles when using 300 concurrent workers though… feels like I reached the cluster limits.
If this pattern is the most mature for SingleStore at the moment, then the highest processing rate of SingleStore for processing rows atomically and concurrently is around 150 rows / sec.
PS: unless you have new ideas, I will mark the implementation of your last pattern as the solution.
Nicely done! I would test how this scales to larger cluster sizes. I know that we have achieved much higher rps using this pattern. Also scale your workers out onto multiple machines so they don’t compete with singlestore resources or each other. I bet you will be pleasantly surprised.